From 41cae73ee82b9884099367321c517fdd45a0fbab Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Mon, 13 Apr 2026 17:02:21 +0200 Subject: [PATCH 1/4] add Deduplication returning Kstream --- README.md | 125 ++++- .../deduplication/DedupHeadersProcessor.java | 84 ++- .../DedupHeadersProcessorWithErrors.java | 133 +++++ .../deduplication/DedupKeyProcessor.java | 76 ++- .../DedupKeyProcessorWithErrors.java | 97 ++++ .../deduplication/DedupKeyValueProcessor.java | 75 ++- .../DedupKeyValueProcessorWithErrors.java | 98 ++++ .../DedupWithPredicateProcessor.java | 87 ++-- ...DedupWithPredicateProcessorWithErrors.java | 107 ++++ .../deduplication/DeduplicationUtils.java | 479 ++++++++++++++---- .../DedupHeadersProcessorTest.java | 14 +- .../DedupHeadersProcessorWithErrorsTest.java | 128 +++++ .../deduplication/DedupKeyProcessorTest.java | 12 +- .../DedupKeyProcessorWithErrorsTest.java | 119 +++++ .../DedupKeyValueProcessorTest.java | 33 +- .../DedupKeyValueProcessorWithErrorsTest.java | 119 +++++ .../DedupWithPredicateProcessorTest.java | 33 +- ...pWithPredicateProcessorWithErrorsTest.java | 125 +++++ .../initializer/KafkaStreamsStarterTest.java | 6 +- 19 files changed, 1613 insertions(+), 337 deletions(-) create mode 100644 kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java create mode 100644 kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java create mode 100644 kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java create mode 100644 kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java create mode 100644 kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java create mode 100644 kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java create mode 100644 kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java create mode 100644 kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java diff --git a/README.md b/README.md index 95705da3..850bd5ae 100644 --- a/README.md +++ b/README.md @@ -808,29 +808,74 @@ For more details about prefixes, see the [Prefix](#prefix) section. The `DeduplicationUtils` class helps you deduplicate streams based on various criteria and within a specified time window. -All deduplication methods return a `KStream`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`. +All `*withErrors` methods return a `KStream`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`. +Methods without `*withErrors` return a plain `KStream` and can be used directly. **Note**: Only streams with `String` keys and Avro values are supported. -#### By Key +#### By Key + +##### With error handling ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { - @Override - public void topology(StreamsBuilder streamsBuilder) { - KStream myStream = streamsBuilder + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder .stream("input_topic"); - DeduplicationUtils - .deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60)) - .to("output_topic"); - } + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + DeduplicationUtils + .distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } } ``` #### By Key and Value +##### With error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByKeyValuesWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @@ -840,7 +885,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60)) + .distinctByKeyValues(streamsBuilder, myStream, Duration.ofDays(60)) .to("output_topic"); } } @@ -848,6 +893,33 @@ public class MyKafkaStreams extends KafkaStreamsStarter { #### By Predicate +##### With error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByPredicateWithErrors( + streamsBuilder, + myStream, + Duration.ofDays(60), + value -> value.getFirstName() + "#" + value.getLastName() + ) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @@ -857,7 +929,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60), + .distinctByPredicate(streamsBuilder, myStream, Duration.ofDays(60), value -> value.getFirstName() + "#" + value.getLastName()) .to("output_topic"); } @@ -868,6 +940,35 @@ In the predicate approach, the provided predicate is used as the key in the wind #### By Headers +##### With error handling + +```java +import java.util.List; + +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByHeadersWithErrors( + streamsBuilder, + myStream, + Duration.ofDays(60), + List.of("header1", "header2") + ) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java import java.util.List; @@ -879,7 +980,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateWithHeaders(streamsBuilder, myStream, Duration.ofDays(60), + .distinctByHeaders(streamsBuilder, myStream, Duration.ofDays(60), List.of("header1", "header2")) .to("output_topic"); } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java index 2cd124a4..ec0a3839 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java @@ -18,7 +18,6 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; @@ -34,12 +33,11 @@ import org.apache.kafka.streams.state.WindowStore; /** - * Transformer class for the deduplication mechanism on headers of a given topic. + * Processor class for the deduplication mechanism on headers of a given topic. * * @param The type of the value */ -public class DedupHeadersProcessor - implements Processor> { +public class DedupHeadersProcessor implements Processor { /** Window store name, initialized @ construction. */ private final String windowStoreName; @@ -47,8 +45,8 @@ public class DedupHeadersProcessor private final Duration retentionWindowDuration; /** Deduplication headers list. */ private final List deduplicationHeadersList; - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; + /** Kstream context for this processor. */ + private ProcessorContext processorContext; /** Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; @@ -66,41 +64,49 @@ public DedupHeadersProcessor( this.deduplicationHeadersList = deduplicationHeadersList; } + /** + * Get the header value for a given key + * + * @param headers headers of the record + * @param key the key to look for in the headers + * @return The header value for the given key, or an empty string if the header is not present or has no value. + */ + private static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return StringUtils.EMPTY; + } + String value = new String(header.value(), StandardCharsets.UTF_8); + return StringUtils.defaultString(value); + } + @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); - String identifier = buildIdentifier(message.headers()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = buildIdentifier(message.headers()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - identifier, - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (identifier.equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(currentKeyValue.value)) { + return; } } - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(identifier, identifier, message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, identifier, message.timestamp()); + processorContext.forward(message); } /** @@ -114,20 +120,4 @@ private String buildIdentifier(Headers headers) { .map(key -> getHeader(headers, key)) .collect(Collectors.joining("#")); } - - /** - * Get the header value for a given key - * - * @param headers headers of the record - * @param key the key to look for in the headers - * @return The header value for the given key, or an empty string if the header is not present or has no value. - */ - private static String getHeader(Headers headers, String key) { - Header header = headers.lastHeader(key); - if (header == null || header.value() == null) { - return StringUtils.EMPTY; - } - String value = new String(header.value(), StandardCharsets.UTF_8); - return StringUtils.defaultString(value); - } -} +} \ No newline at end of file diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java new file mode 100644 index 00000000..b8623b7a --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.specific.SpecificRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on headers of a given topic. + * + * @param The type of the value + */ +public class DedupHeadersProcessorWithErrors + implements Processor> { + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + /** Deduplication headers list. */ + private final List deduplicationHeadersList; + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** + * Constructor. + * + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration + * @param deduplicationHeadersList Deduplication headers list + */ + public DedupHeadersProcessorWithErrors( + String windowStoreName, Duration retentionWindowDuration, List deduplicationHeadersList) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + this.deduplicationHeadersList = deduplicationHeadersList; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = buildIdentifier(message.headers()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(currentKeyValue.value)) { + return; + } + } + } + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, identifier, message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } + + /** + * Build an identifier for the record based on the headers and the keys provided. + * + * @param headers The headers of the record + * @return The built identifier + */ + private String buildIdentifier(Headers headers) { + return deduplicationHeadersList.stream() + .map(key -> getHeader(headers, key)) + .collect(Collectors.joining("#")); + } + + /** + * Get the header value for a given key + * + * @param headers headers of the record + * @param key the key to look for in the headers + * @return The header value for the given key, or an empty string if the header is not present or has no value. + */ + private static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return StringUtils.EMPTY; + } + String value = new String(header.value(), StandardCharsets.UTF_8); + return StringUtils.defaultString(value); + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index 7b4f1655..57a57257 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -18,39 +18,44 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; -import java.time.Duration; -import java.time.Instant; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.time.Instant; + /** - * Transformer class for the deduplication mechanism on keys of a given topic. + * Processor class for the deduplication mechanism on keys of a given topic. * * @param The type of the value */ public class DedupKeyProcessor - implements Processor> { - - /** Context for this transformer. */ - private ProcessorContext> processorContext; + implements Processor { - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; - - /** Window store name, initialized @ construction. */ + /** + * Window store name, initialized @ construction. + */ private final String windowStoreName; - - /** Retention window for the state store. Used for fetching data. */ + /** + * Retention window for the state store. Used for fetching data. + */ private final Duration retentionWindowDuration; + /** + * Context for this processor. + */ + private ProcessorContext processorContext; + /** + * Window store containing all the records seen on the given window. + */ + private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName The name of the constructor + * @param windowStoreName The name of the constructor * @param retentionWindowDuration The retentionWindow Duration */ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuration) { @@ -59,39 +64,32 @@ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuratio } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - message.key(), - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (message.key().equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.key().equals(currentKeyValue.value)) { + return; } } - - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(message.key(), message.key(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.key(), message.timestamp()); + processorContext.forward(message); + } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java new file mode 100644 index 00000000..aeeaf065 --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on keys of a given topic. + * + * @param The type of the value + */ +public class DedupKeyProcessorWithErrors + implements Processor> { + + /** Context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** + * Constructor. + * + * @param windowStoreName The name of the constructor + * @param retentionWindowDuration The retentionWindow Duration + */ + public DedupKeyProcessorWithErrors(String windowStoreName, Duration retentionWindowDuration) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + } + + @Override + public void init(ProcessorContext> context) { + processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.key().equals(currentKeyValue.value)) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.key(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java index cfb0812c..339d2bcd 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java @@ -18,39 +18,44 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; -import java.time.Duration; -import java.time.Instant; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.time.Instant; + /** - * Transformer class for the deduplication mechanism on both keys and values of a given topic. + * Processor class for the deduplication mechanism on both keys and values of a given topic. * * @param The type of the value */ public class DedupKeyValueProcessor - implements Processor> { + implements Processor { - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; - - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; - - /** Window store name, initialized @ construction. */ + /** + * Window store name, initialized @ construction. + */ private final String windowStoreName; - - /** Retention window for the state store. Used for fetching data. */ + /** + * Retention window for the state store. Used for fetching data. + */ private final Duration retentionWindowDuration; + /** + * Kstream context for this processor. + */ + private ProcessorContext processorContext; + /** + * Window store containing all the records seen on the given window. + */ + private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName The window store name + * @param windowStoreName The window store name * @param retentionWindowHours The retention window duration */ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHours) { @@ -59,7 +64,7 @@ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHo } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); @@ -67,32 +72,24 @@ public void init(ProcessorContext> context) { @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - message.key(), - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (message.value().equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.value().equals(currentKeyValue.value)) { + return; } } - - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(message.key(), message.value(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.value(), message.timestamp()); + processorContext.forward(message); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java new file mode 100644 index 00000000..8e47121d --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on both keys and values of a given topic. + * + * @param The type of the value + */ +public class DedupKeyValueProcessorWithErrors + implements Processor> { + + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** + * Constructor. + * + * @param windowStoreName The window store name + * @param retentionWindowHours The retention window duration + */ + public DedupKeyValueProcessorWithErrors(String windowStoreName, Duration retentionWindowHours) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowHours; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.value().equals(currentKeyValue.value)) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 4f0e0bf1..0b71bd48 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -18,45 +18,51 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; -import java.time.Duration; -import java.time.Instant; -import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.time.Instant; +import java.util.function.Function; + /** - * Transformer class for the deduplication mechanism on predicate of a given topic. + * Processor class for the deduplication mechanism on predicate of a given topic. * * @param The type of the key * @param The type of the value */ public class DedupWithPredicateProcessor - implements Processor> { + implements Processor { - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; - - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; - - /** Window store name, initialized @ construction. */ + /** + * Window store name, initialized @ construction. + */ private final String windowStoreName; - - /** Retention window for the state store. Used for fetching data. */ + /** + * Retention window for the state store. Used for fetching data. + */ private final Duration retentionWindowDuration; - - /** Deduplication key extractor. */ + /** + * Deduplication key extractor. + */ private final Function deduplicationKeyExtractor; + /** + * Kstream context for this processor. + */ + private ProcessorContext processorContext; + /** + * Window store containing all the records seen on the given window. + */ + private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName Name of the deduplication state store - * @param retentionWindowDuration Retention window duration + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration * @param deduplicationKeyExtractor Deduplication function */ public DedupWithPredicateProcessor( @@ -67,41 +73,34 @@ public DedupWithPredicateProcessor( } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); - String identifier = deduplicationKeyExtractor.apply(message.value()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = deduplicationKeyExtractor.apply(message.value()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - identifier, - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { + return; } } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, message.value(), message.timestamp()); + processorContext.forward(message); - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(identifier, message.value(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); - } } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java new file mode 100644 index 00000000..75dde8b3 --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import java.util.function.Function; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on predicate of a given topic. + * + * @param The type of the key + * @param The type of the value + */ +public class DedupWithPredicateProcessorWithErrors + implements Processor> { + + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** Deduplication key extractor. */ + private final Function deduplicationKeyExtractor; + + /** + * Constructor. + * + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration + * @param deduplicationKeyExtractor Deduplication function + */ + public DedupWithPredicateProcessorWithErrors( + String windowStoreName, Duration retentionWindowDuration, Function deduplicationKeyExtractor) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + this.deduplicationKeyExtractor = deduplicationKeyExtractor; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = deduplicationKeyExtractor.apply(message.value()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index 08e10032..fe8141e9 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -40,21 +40,39 @@ public final class DeduplicationUtils { private DeduplicationUtils() {} + /** @deprecated Since 1.7.0, use {@link #distinctKeysWithErrors(StreamsBuilder, KStream, Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctKeysWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); + } + /** - * Deduplicate the input stream on the input key using a window store for the given period of time. This constructor - * should not be used if using the deduplicator multiple times in the same topology. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return KStream with a processingResult + * @deprecated Since 1.7.0, use {@link #distinctKeysWithErrors(StreamsBuilder, KStream, String, String, Duration)} + * instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + return distinctKeysWithErrors(streamsBuilder, initialStream, storeName, repartitionName, windowDuration); + } + + /** See {@link #distinctKeysWithErrors(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream> distinctKeysWithErrors( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeys( + return distinctKeysWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -63,18 +81,21 @@ public static KStream> } /** - * Deduplicate the input stream on the input key using a window store for the given period of time. + * Deduplicates records from the input stream using the record key. + * + *

Records with identical keys within the configured time window are considered duplicates and are filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing {@link ProcessingResult} */ - public static KStream> deduplicateKeys( + public static KStream> distinctKeysWithErrors( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, @@ -90,24 +111,65 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), storeName); + + return repartitioned.process(() -> new DedupKeyProcessorWithErrors<>(storeName, windowDuration), storeName); + } + + /** See {@link #distinctKeys(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream distinctKeys( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctKeys( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** - * Deduplicate the input stream on the input key and value using a window store for the given period of time. This - * constructor should not be used if using the deduplicator multiple times in the same topology. + * Deduplicates records from the input stream using the record key. + * + *

Records with identical keys within the configured time window are considered duplicates and are filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return KStream with a processingResult + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing + */ + public static KStream distinctKeys( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + Serdes.String()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeyValues( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeyValues( + return distinctByKeyValuesWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -116,18 +178,10 @@ public static KStream> } /** - * Deduplicate the input stream on the input key and Value using a window store for the given period of time. The - * input stream should have a String key. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @deprecated Since 1.7.0, use {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, String, String, + * Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeyValues( StreamsBuilder streamsBuilder, KStream initialStream, @@ -135,6 +189,42 @@ public static KStream> String repartitionName, Duration windowDuration) { + return distinctByKeyValuesWithErrors(streamsBuilder, initialStream, storeName, repartitionName, windowDuration); + } + + /** See {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream> distinctByKeyValuesWithErrors( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctByKeyValuesWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); + } + + /** + * Deduplicates records from the input stream using both key and value. + * + *

Records with identical key-value pairs within the configured time window are considered duplicates and are + * filtered out. + * + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing {@link ProcessingResult} + */ + public static KStream> distinctByKeyValuesWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), Serdes.String(), @@ -144,64 +234,132 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), storeName); + + return repartitioned.process( + () -> new DedupKeyValueProcessorWithErrors<>(storeName, windowDuration), storeName); + } + + /** See {@link #distinctByKeyValues(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream distinctByKeyValues( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctByKeyValues( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** - * Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a - * unique signature for the record. Uses a window store for the given period of time. The input stream should have a - * String key. This constructor should not be used if using the deduplicator multiple times in the same topology. - * Use {@link DeduplicationUtils#deduplicateWithPredicate(StreamsBuilder, KStream, String storeName, String - * repartitionName, Duration, Function)} in this scenario. + * Deduplicates records from the input stream using both key and value. + * + *

Records with identical key-value pairs within the configured time window are considered duplicates and are + * filtered out. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts - * like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to - * provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing */ + public static KStream distinctByKeyValues( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + SerdesUtils.getValueSerdes()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, Duration, Function)} + * instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateWithPredicate( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, - Function deduplicationKeyExtractor) { - return deduplicateWithPredicate( + Function extractor) { + + return distinctByPredicateWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, windowDuration, - deduplicationKeyExtractor); + extractor); } /** - * Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a - * unique signature for the record. Uses a window store for the given period of time. The input stream should have a - * String key. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts - * like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to - * provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @deprecated Since 1.7.0, use {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, + * Duration, Function)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateWithPredicate( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration, - Function deduplicationKeyExtractor) { + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, initialStream, storeName, repartitionName, windowDuration, extractor); + } + + /** See {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, Duration, Function)} */ + public static KStream> distinctByPredicateWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + extractor); + } + + /** + * Deduplicates records from the input stream using a computed deduplication key. + * + *

The provided extractor builds a deduplication key for each record. Records with identical keys within the + * configured time window are considered duplicates and are filtered out. + * + * @param streamsBuilder the {@link StreamsBuilder} + * @param initialStream the input stream + * @param storeName state store name + * @param repartitionName repartition topic name + * @param windowDuration deduplication window + * @param extractor function building the deduplication key + * @param value type + * @return a deduplicated stream containing {@link ProcessingResult} + */ + public static KStream> distinctByPredicateWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + Function extractor) { StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), @@ -212,39 +370,164 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); + return repartitioned.process( - () -> new DedupWithPredicateProcessor<>(storeName, windowDuration, deduplicationKeyExtractor), - storeName); + () -> new DedupWithPredicateProcessorWithErrors<>(storeName, windowDuration, extractor), storeName); + } + + /** See {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, Duration, Function)} */ + public static KStream> distinctByPredicate( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + extractor); } /** - * Deduplicates records from the input stream based on a computed key derived from each record. + * Deduplicates records from the input stream using a computed deduplication key. * - *

The provided {@code deduplicationHeadersExtractor} generates a list of String values that together form a - * unique identifier for a record. Records with the same identifier within the given time window are considered - * duplicates. + *

The provided extractor builds a deduplication key for each record. Records with identical keys within the + * configured time window are considered duplicates and are filtered out. * - *

A window store is used to track seen identifiers for the specified {@code windowDuration}. + * @param streamsBuilder the {@link StreamsBuilder} + * @param initialStream the input stream + * @param storeName state store name + * @param repartitionName repartition topic name + * @param windowDuration deduplication window + * @param extractor function building the deduplication key + * @param value type + * @return a deduplicated stream containing + */ + public static KStream distinctByPredicate( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + Function extractor) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + SerdesUtils.getValueSerdes()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process( + () -> new DedupWithPredicateProcessor<>(storeName, windowDuration, extractor), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, Duration, List)} + * instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateWithHeaders( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + deduplicationHeadersList); + } + + /** + * @deprecated since 1.7.0, use {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, String, String, + * Duration, List)} instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateWithHeaders( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, initialStream, storeName, repartitionName, windowDuration, deduplicationHeadersList); + } + + /** See {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, String, String, Duration, List)} */ + public static KStream> distinctByHeadersWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + deduplicationHeadersList); + } + + /** + * Deduplicates records from the input stream using a composite key built from the provided headers. * - *

Note: This method uses internally generated store and repartition names. It should not be used multiple - * times in the same topology. In such cases, use {@link DeduplicationUtils#deduplicateWithHeaders(StreamsBuilder, - * KStream, String, String, Duration, List)}. + *

The {@code deduplicationHeadersList} defines which headers are used to build the deduplication key. Records + * with identical header values within the configured time window are considered duplicates and filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * * @param streamsBuilder the {@link StreamsBuilder} used to build the topology * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeadersList list of header names to extract from each record for deduplication. The - * combination of these header values forms the unique identifier for deduplication. + * @param deduplicationHeadersList list of header names used to build the deduplication key * @param the value type of the stream * @return a deduplicated stream containing {@link ProcessingResult} */ - public static KStream> deduplicateWithHeaders( + public static KStream> distinctByHeadersWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + List deduplicationHeadersList) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + Serdes.String()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + return repartitioned.process( + () -> new DedupHeadersProcessorWithErrors<>(storeName, windowDuration, deduplicationHeadersList), + storeName); + } + + /** See {@link #distinctByHeaders(StreamsBuilder, KStream, String, String, Duration, List)} */ + public static KStream distinctByHeaders( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, List deduplicationHeadersList) { - return deduplicateWithHeaders( + return distinctByHeaders( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -254,25 +537,23 @@ public static KStream> } /** - * Deduplicates records from the input stream based on a computed key derived from each record. + * Deduplicates records from the input stream using a composite key built from the provided headers. * - *

The {@code deduplicationHeadersExtractor} produces a list of String values used to build a unique identifier - * for each record. Records sharing the same identifier within the configured time window are considered duplicates. + *

The {@code deduplicationHeadersList} defines which headers are used to build the deduplication key. Records + * with identical header values within the configured time window are considered duplicates and filtered out. * - *

This variant allows specifying custom state store and repartition names, making it suitable for reuse within - * the same topology. + *

A window store is used to track seen keys during the specified {@code windowDuration}. * * @param streamsBuilder the {@link StreamsBuilder} used to build the topology * @param initialStream the input stream to deduplicate (must have String keys) * @param storeName the name of the state store used for deduplication * @param repartitionName the name of the repartition topic * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeadersList list of header names to extract from each record for deduplication. The - * combination of these header values forms the unique identifier for deduplication. + * @param deduplicationHeadersList list of header names used to build the deduplication key * @param the value type of the stream - * @return a deduplicated stream containing {@link ProcessingResult} + * @return a deduplicated stream */ - public static KStream> deduplicateWithHeaders( + public static KStream distinctByHeaders( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, @@ -292,4 +573,4 @@ public static KStream> return repartitioned.process( () -> new DedupHeadersProcessor<>(storeName, windowDuration, deduplicationHeadersList), storeName); } -} +} \ No newline at end of file diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java index 57ef2c47..760f6497 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import java.util.List; import org.apache.kafka.streams.KeyValue; @@ -48,7 +48,7 @@ class DedupHeadersProcessorTest { private DedupHeadersProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -76,7 +76,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("value-1#value-3", "value-1#value-3", message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -115,12 +115,6 @@ void shouldThrowException() { .thenThrow(new RuntimeException("Exception...")); doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); - processor.process(message); - - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + assertThrows(RuntimeException.class, () -> processor.process(message)); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java new file mode 100644 index 00000000..36d81889 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DedupHeadersProcessorWithErrorsTest { + + private DedupHeadersProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithHeadersProcessor for testing + processor = new DedupHeadersProcessorWithErrors<>("testStore", Duration.ofHours(1), List.of("header-1", "header-3")); + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + processor.process(message); + + verify(windowStore).put("value-1#value-3", "value-1#value-3", message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, "value-1#value-3")); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java index 1158d087..f18031d9 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -47,7 +48,7 @@ class DedupKeyProcessorTest { private DedupKeyProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +75,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("key", "key", message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -107,12 +108,7 @@ void shouldThrowException() { .thenThrow(new RuntimeException("Exception...")); doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); - processor.process(message); + assertThrows(RuntimeException.class, () -> processor.process(message)); - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java new file mode 100644 index 00000000..611e9d57 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.time.Duration; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class DedupKeyProcessorWithErrorsTest { + + private DedupKeyProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyProcessorWithErrors<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put("key", "key", message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, "key")); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java index 5ccb1432..33065887 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -18,18 +18,8 @@ */ package com.michelin.kstreamplify.deduplication; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; -import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -41,13 +31,25 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) class DedupKeyValueProcessorTest { private DedupKeyValueProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +76,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put(message.key(), message.value(), message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -108,12 +110,7 @@ void shouldThrowException() { doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); // Call the process method - processor.process(message); + assertThrows(RuntimeException.class, () -> processor.process(message)); - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java new file mode 100644 index 00000000..1e930677 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupKeyValueProcessorWithErrorsTest { + + private DedupKeyValueProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyValueProcessorWithErrors<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put(message.key(), message.value(), message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + final Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 8481d965..6392767a 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -18,18 +18,8 @@ */ package com.michelin.kstreamplify.deduplication; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; -import java.time.Duration; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -42,12 +32,24 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.time.Duration; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @ExtendWith(MockitoExtension.class) class DedupWithPredicateProcessorTest { private DedupWithPredicateProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +76,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("", message.value(), message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -108,13 +110,8 @@ void shouldThrowException() { doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); // Call the process method - processor.process(message); + assertThrows(RuntimeException.class, () -> processor.process(message)); - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); } static class KeyExtractorStub { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java new file mode 100644 index 00000000..e052a079 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupWithPredicateProcessorWithErrorsTest { + private DedupWithPredicateProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupWithPredicateProcessorWithErrors<>("testStore", Duration.ofHours(1), KeyExtractorStub::extract); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put("", message.value(), message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } + + static class KeyExtractorStub { + public static String extract(V v) { + return ""; + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java index 9720dfa8..27c0b430 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java @@ -92,19 +92,19 @@ static class KafkaStreamsStarterStub extends KafkaStreamsStarter { public void topology(StreamsBuilder streamsBuilder) { var streams = TopicWithSerdeStub.inputTopicWithSerde().stream(streamsBuilder); - DeduplicationUtils.deduplicateKeys( + DeduplicationUtils.distinctKeysWithErrors( streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateKeyValues( + DeduplicationUtils.distinctByKeyValuesWithErrors( streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); + DeduplicationUtils.distinctByPredicateWithErrors(streamsBuilder, streams, Duration.ofMillis(1), null); var enrichedStreams = streams.mapValues(KafkaStreamsStarterStub::enrichValue); var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterStub::enrichValue2); From f4dd2f549b5b5c43d69e34bd8edbb90b57374a35 Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Tue, 14 Apr 2026 07:39:25 +0200 Subject: [PATCH 2/4] add Deduplication returning Kstream apply spotless --- .../deduplication/DedupKeyProcessor.java | 27 +++++--------- .../deduplication/DedupKeyValueProcessor.java | 26 +++++--------- .../DedupWithPredicateProcessor.java | 36 ++++++------------- .../DedupHeadersProcessorWithErrorsTest.java | 27 +++++++------- .../deduplication/DedupKeyProcessorTest.java | 2 -- .../DedupKeyProcessorWithErrorsTest.java | 21 ++++++----- .../DedupKeyValueProcessorTest.java | 25 ++++++------- .../DedupWithPredicateProcessorTest.java | 25 ++++++------- ...pWithPredicateProcessorWithErrorsTest.java | 3 +- 9 files changed, 74 insertions(+), 118 deletions(-) diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index 57a57257..ad980181 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -18,44 +18,34 @@ */ package com.michelin.kstreamplify.deduplication; +import java.time.Duration; +import java.time.Instant; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; -import java.time.Duration; -import java.time.Instant; - /** * Processor class for the deduplication mechanism on keys of a given topic. * * @param The type of the value */ -public class DedupKeyProcessor - implements Processor { +public class DedupKeyProcessor implements Processor { - /** - * Window store name, initialized @ construction. - */ + /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** - * Retention window for the state store. Used for fetching data. - */ + /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; - /** - * Context for this processor. - */ + /** Context for this processor. */ private ProcessorContext processorContext; - /** - * Window store containing all the records seen on the given window. - */ + /** Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName The name of the constructor + * @param windowStoreName The name of the constructor * @param retentionWindowDuration The retentionWindow Duration */ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuration) { @@ -90,6 +80,5 @@ public void process(Record message) { // First time we see this record, store entry in the window store and forward the record to the output dedupWindowStore.put(message.key(), message.key(), message.timestamp()); processorContext.forward(message); - } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java index 339d2bcd..7b8247b0 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java @@ -18,44 +18,34 @@ */ package com.michelin.kstreamplify.deduplication; +import java.time.Duration; +import java.time.Instant; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; -import java.time.Duration; -import java.time.Instant; - /** * Processor class for the deduplication mechanism on both keys and values of a given topic. * * @param The type of the value */ -public class DedupKeyValueProcessor - implements Processor { +public class DedupKeyValueProcessor implements Processor { - /** - * Window store name, initialized @ construction. - */ + /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** - * Retention window for the state store. Used for fetching data. - */ + /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; - /** - * Kstream context for this processor. - */ + /** Kstream context for this processor. */ private ProcessorContext processorContext; - /** - * Window store containing all the records seen on the given window. - */ + /** Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName The window store name + * @param windowStoreName The window store name * @param retentionWindowHours The retention window duration */ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHours) { diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 0b71bd48..899270a6 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -18,51 +18,39 @@ */ package com.michelin.kstreamplify.deduplication; +import java.time.Duration; +import java.time.Instant; +import java.util.function.Function; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.WindowStore; -import java.time.Duration; -import java.time.Instant; -import java.util.function.Function; - /** * Processor class for the deduplication mechanism on predicate of a given topic. * * @param The type of the key * @param The type of the value */ -public class DedupWithPredicateProcessor - implements Processor { +public class DedupWithPredicateProcessor implements Processor { - /** - * Window store name, initialized @ construction. - */ + /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** - * Retention window for the state store. Used for fetching data. - */ + /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; - /** - * Deduplication key extractor. - */ + /** Deduplication key extractor. */ private final Function deduplicationKeyExtractor; - /** - * Kstream context for this processor. - */ + /** Kstream context for this processor. */ private ProcessorContext processorContext; - /** - * Window store containing all the records seen on the given window. - */ + /** Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; /** * Constructor. * - * @param windowStoreName Name of the deduplication state store - * @param retentionWindowDuration Retention window duration + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration * @param deduplicationKeyExtractor Deduplication function */ public DedupWithPredicateProcessor( @@ -100,7 +88,5 @@ public void process(Record message) { // First time we see this record, store entry in the window store and forward the record to the output dedupWindowStore.put(identifier, message.value(), message.timestamp()); processorContext.forward(message); - - } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java index 36d81889..e0da9834 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java @@ -18,8 +18,19 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.util.List; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -31,19 +42,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.time.Duration; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class DedupHeadersProcessorWithErrorsTest { @@ -61,7 +59,8 @@ class DedupHeadersProcessorWithErrorsTest { @BeforeEach void setUp() { // Create an instance of DedupWithHeadersProcessor for testing - processor = new DedupHeadersProcessorWithErrors<>("testStore", Duration.ofHours(1), List.of("header-1", "header-3")); + processor = new DedupHeadersProcessorWithErrors<>( + "testStore", Duration.ofHours(1), List.of("header-1", "header-3")); // Stub the context.getStateStore method to return the mock store when(context.getStateStore("testStore")).thenReturn(windowStore); diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java index f18031d9..ea982929 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -29,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -109,6 +108,5 @@ void shouldThrowException() { doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); assertThrows(RuntimeException.class, () -> processor.process(message)); - } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java index 611e9d57..e0259232 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java @@ -18,8 +18,18 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.michelin.kstreamplify.avro.KafkaError; import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -31,17 +41,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.time.Duration; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class DedupKeyProcessorWithErrorsTest { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java index 33065887..26eaa02a 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -18,8 +18,18 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; @@ -31,18 +41,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.time.Duration; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class DedupKeyValueProcessorTest { @@ -111,6 +109,5 @@ void shouldThrowException() { // Call the process method assertThrows(RuntimeException.class, () -> processor.process(message)); - } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 6392767a..d4d263f5 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -18,8 +18,18 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -32,18 +42,6 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import java.time.Duration; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - @ExtendWith(MockitoExtension.class) class DedupWithPredicateProcessorTest { private DedupWithPredicateProcessor processor; @@ -111,7 +109,6 @@ void shouldThrowException() { // Call the process method assertThrows(RuntimeException.class, () -> processor.process(message)); - } static class KeyExtractorStub { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java index e052a079..c51e98b9 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java @@ -58,7 +58,8 @@ class DedupWithPredicateProcessorWithErrorsTest { @BeforeEach void setUp() { // Create an instance of DedupWithPredicateProcessor for testing - processor = new DedupWithPredicateProcessorWithErrors<>("testStore", Duration.ofHours(1), KeyExtractorStub::extract); + processor = new DedupWithPredicateProcessorWithErrors<>( + "testStore", Duration.ofHours(1), KeyExtractorStub::extract); // Stub the context.getStateStore method to return the mock store when(context.getStateStore("testStore")).thenReturn(windowStore); From ea43211ba444c6a561a9eb2e6c7fb7704e26095f Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Wed, 15 Apr 2026 07:49:38 +0200 Subject: [PATCH 3/4] add Deduplication returning Kstream --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 850bd5ae..f5a7293b 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ Kstreamplify adds extra features to Kafka Streams, simplifying development so yo * [By Key](#by-key) * [By Key and Value](#by-key-and-value) * [By Predicate](#by-predicate) + * [By Headers](#by-headers) * [OpenTelemetry](#opentelemetry) * [Custom Tags for Metrics](#custom-tags-for-metrics) * [Swagger](#swagger) From ebc313fcb9167e9422e5beadf4436ebf424f717c Mon Sep 17 00:00:00 2001 From: Sebastien Viale Date: Wed, 15 Apr 2026 07:51:17 +0200 Subject: [PATCH 4/4] add Deduplication returning Kstream --- .../kstreamplify/deduplication/DedupHeadersProcessor.java | 2 +- .../michelin/kstreamplify/deduplication/DeduplicationUtils.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java index ec0a3839..6f445d94 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java @@ -120,4 +120,4 @@ private String buildIdentifier(Headers headers) { .map(key -> getHeader(headers, key)) .collect(Collectors.joining("#")); } -} \ No newline at end of file +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index fe8141e9..74ad3e1f 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -573,4 +573,4 @@ public static KStream distinctByHeaders( return repartitioned.process( () -> new DedupHeadersProcessor<>(storeName, windowDuration, deduplicationHeadersList), storeName); } -} \ No newline at end of file +}