Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 114 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ Kstreamplify adds extra features to Kafka Streams, simplifying development so yo
* [By Key](#by-key)
* [By Key and Value](#by-key-and-value)
* [By Predicate](#by-predicate)
* [By Headers](#by-headers)
* [OpenTelemetry](#opentelemetry)
* [Custom Tags for Metrics](#custom-tags-for-metrics)
* [Swagger](#swagger)
Expand Down Expand Up @@ -808,29 +809,74 @@ For more details about prefixes, see the [Prefix](#prefix) section.

The `DeduplicationUtils` class helps you deduplicate streams based on various criteria and within a specified time window.

All deduplication methods return a `KStream<String, ProcessingResult<V,V2>`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`.
All `*withErrors` methods return a `KStream<String, ProcessingResult<V,V2>`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`.
Methods without `*withErrors` return a plain `KStream<String, V>` and can be used directly.

**Note**: Only streams with `String` keys and Avro values are supported.

#### By Key
#### By Key

##### With error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

DeduplicationUtils
.deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic");
}
TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

##### Without error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

DeduplicationUtils
.distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

#### By Key and Value

##### With error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.distinctByKeyValuesWithErrors(streamsBuilder, myStream, Duration.ofDays(60))
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

##### Without error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
Expand All @@ -840,14 +886,41 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
.distinctByKeyValues(streamsBuilder, myStream, Duration.ofDays(60))
.to("output_topic");
}
}
```

#### By Predicate

##### With error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.distinctByPredicateWithErrors(
streamsBuilder,
myStream,
Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName()
)
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

##### Without error handling

```java
@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
Expand All @@ -857,7 +930,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60),
.distinctByPredicate(streamsBuilder, myStream, Duration.ofDays(60),
value -> value.getFirstName() + "#" + value.getLastName())
.to("output_topic");
}
Expand All @@ -868,6 +941,35 @@ In the predicate approach, the provided predicate is used as the key in the wind

#### By Headers

##### With error handling

```java
import java.util.List;

@Component
public class MyKafkaStreams extends KafkaStreamsStarter {
@Override
public void topology(StreamsBuilder streamsBuilder) {
KStream<String, KafkaUser> myStream = streamsBuilder
.stream("input_topic");

TopologyErrorHandler
.catchErrors(
DeduplicationUtils
.distinctByHeadersWithErrors(
streamsBuilder,
myStream,
Duration.ofDays(60),
List.of("header1", "header2")
)
)
.to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes()));
}
}
```

##### Without error handling

```java
import java.util.List;

Expand All @@ -879,7 +981,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter {
.stream("input_topic");

DeduplicationUtils
.deduplicateWithHeaders(streamsBuilder, myStream, Duration.ofDays(60),
.distinctByHeaders(streamsBuilder, myStream, Duration.ofDays(60),
List.of("header1", "header2"))
.to("output_topic");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package com.michelin.kstreamplify.deduplication;

import com.michelin.kstreamplify.error.ProcessingResult;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -34,21 +33,20 @@
import org.apache.kafka.streams.state.WindowStore;

/**
* Transformer class for the deduplication mechanism on headers of a given topic.
* Processor class for the deduplication mechanism on headers of a given topic.
*
* @param <V> The type of the value
*/
public class DedupHeadersProcessor<V extends SpecificRecord>
implements Processor<String, V, String, ProcessingResult<V, V>> {
public class DedupHeadersProcessor<V extends SpecificRecord> implements Processor<String, V, String, V> {

/** Window store name, initialized @ construction. */
private final String windowStoreName;
/** Retention window for the state store. Used for fetching data. */
private final Duration retentionWindowDuration;
/** Deduplication headers list. */
private final List<String> deduplicationHeadersList;
/** Kstream context for this transformer. */
private ProcessorContext<String, ProcessingResult<V, V>> processorContext;
/** Kstream context for this processor. */
private ProcessorContext<String, V> processorContext;
/** Window store containing all the records seen on the given window. */
private WindowStore<String, String> dedupWindowStore;

Expand All @@ -66,41 +64,49 @@ public DedupHeadersProcessor(
this.deduplicationHeadersList = deduplicationHeadersList;
}

/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return StringUtils.EMPTY;
}
String value = new String(header.value(), StandardCharsets.UTF_8);
return StringUtils.defaultString(value);
}

@Override
public void init(ProcessorContext<String, ProcessingResult<V, V>> context) {
public void init(ProcessorContext<String, V> context) {
this.processorContext = context;
dedupWindowStore = this.processorContext.getStateStore(windowStoreName);
}

@Override
public void process(Record<String, V> message) {
try {
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = buildIdentifier(message.headers());
// Get the record timestamp
var currentInstant = Instant.ofEpochMilli(message.timestamp());
String identifier = buildIdentifier(message.headers());

// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(
identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (identifier.equals(currentKeyValue.value)) {
return;
}
// Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate)
try (var resultIterator = dedupWindowStore.backwardFetch(
identifier,
currentInstant.minus(retentionWindowDuration),
currentInstant.plus(retentionWindowDuration))) {
while (resultIterator != null && resultIterator.hasNext()) {
var currentKeyValue = resultIterator.next();
if (identifier.equals(currentKeyValue.value)) {
return;
}
}
// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, identifier, message.timestamp());
processorContext.forward(ProcessingResult.wrapRecordSuccess(message));
} catch (Exception e) {
processorContext.forward(ProcessingResult.wrapRecordFailure(
e,
message,
"Could not figure out what to do with the current payload: "
+ "An unlikely error occurred during deduplication transform"));
}
// First time we see this record, store entry in the window store and forward the record to the output
dedupWindowStore.put(identifier, identifier, message.timestamp());
processorContext.forward(message);
}

/**
Expand All @@ -114,20 +120,4 @@ private String buildIdentifier(Headers headers) {
.map(key -> getHeader(headers, key))
.collect(Collectors.joining("#"));
}

/**
* Get the header value for a given key
*
* @param headers headers of the record
* @param key the key to look for in the headers
* @return The header value for the given key, or an empty string if the header is not present or has no value.
*/
private static String getHeader(Headers headers, String key) {
Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return StringUtils.EMPTY;
}
String value = new String(header.value(), StandardCharsets.UTF_8);
return StringUtils.defaultString(value);
}
}
Loading
Loading