Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
Expand Down Expand Up @@ -82,6 +83,7 @@ private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor<K
private StreamsMetricsImpl metrics;
private InternalProcessorContext internalProcessorContext;
private Sensor lateRecordDropSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;

@SuppressWarnings("unchecked")
@Override
Expand All @@ -108,7 +110,8 @@ value, context().topic(), context().partition(), context().offset()
return;
}

final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
observedStreamTime = Math.max(observedStreamTime, context().timestamp());
final long closeTime = observedStreamTime - windows.gracePeriodMs();

final long timestamp = context().timestamp();
final List<KeyValue<Windowed<K>, Agg>> merged = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
Expand Down Expand Up @@ -75,6 +76,7 @@ private class KStreamWindowAggregateProcessor extends AbstractProcessor<K, V> {
private StreamsMetricsImpl metrics;
private InternalProcessorContext internalProcessorContext;
private Sensor lateRecordDropSensor;
private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;

@SuppressWarnings("unchecked")
@Override
Expand Down Expand Up @@ -103,7 +105,8 @@ value, context().topic(), context().partition(), context().offset()

// first get the matching windows
final long timestamp = context().timestamp();
final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs();
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long closeTime = observedStreamTime - windows.gracePeriodMs();

final Map<Long, W> matchedWindows = windows.windowsFor(timestamp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals.suppress;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
Expand All @@ -40,14 +41,17 @@ public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
private final long suppressDurationMillis;
private final TimeDefinition<K> bufferTimeDefinition;
private final BufferFullStrategy bufferFullStrategy;
private final boolean shouldSuppressTombstones;
private final boolean safeToDropTombstones;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an unrelated change in trunk/2.2 to improve the code legibility.

I'm proposing to backport it to keep the Suppress code consistent and improve maintainability.

private final String storeName;

private TimeOrderedKeyValueBuffer buffer;
private InternalProcessorContext internalProcessorContext;

private Serde<K> keySerde;
private FullChangeSerde<V> valueSerde;

private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;

public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
final String storeName,
final Serde<K> keySerde,
Expand All @@ -61,7 +65,7 @@ public KTableSuppressProcessor(final SuppressedInternal<K> suppress,
suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis();
bufferTimeDefinition = suppress.timeDefinition();
bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy();
shouldSuppressTombstones = suppress.shouldSuppressTombstones();
safeToDropTombstones = suppress.safeToDropTombstones();
}

@SuppressWarnings("unchecked")
Expand All @@ -75,6 +79,7 @@ public void init(final ProcessorContext context) {

@Override
public void process(final K key, final Change<V> value) {
observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp());
buffer(key, value);
enforceConstraints();
}
Expand All @@ -90,7 +95,7 @@ private void buffer(final K key, final Change<V> value) {
}

private void enforceConstraints() {
final long streamTime = internalProcessorContext.streamTime();
final long streamTime = observedStreamTime;
final long expiryTime = streamTime - suppressDurationMillis;

buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit);
Expand Down Expand Up @@ -130,7 +135,7 @@ private void emit(final KeyValue<Bytes, ContextualRecord> toEmit) {
}

private boolean shouldForward(final Change<V> value) {
return !(value.newValue == null && shouldSuppressTombstones);
return value.newValue != null || !safeToDropTombstones;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,36 @@ public class SuppressedInternal<K> implements Suppressed<K> {
private final BufferConfigInternal bufferConfig;
private final Duration timeToWaitForMoreEvents;
private final TimeDefinition<K> timeDefinition;
private final boolean suppressTombstones;
private final boolean safeToDropTombstones;

/**
* @param safeToDropTombstones Note: it's *only* safe to drop tombstones for windowed KTables in "final results" mode.
* In that case, we have a priori knowledge that we have never before emitted any
* results for a given key, and therefore the tombstone is unnecessary (albeit
* idempotent and correct). We decided that the unnecessary tombstones would not be
* desirable in the output stream, though, hence the ability to drop them.
*
* A alternative is to remember whether a result has previously been emitted
* for a key and drop tombstones in that case, but it would be a little complicated to
* figure out when to forget the fact that we have emitted some result (currently, the
* buffer immediately forgets all about a key when we emit, which helps to keep it
* compact).
*/
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise, this is part of the Suppress code legibility change.

public SuppressedInternal(final String name,
final Duration suppressionTime,
final BufferConfig bufferConfig,
final TimeDefinition<K> timeDefinition,
final boolean suppressTombstones) {
final boolean safeToDropTombstones) {
this.name = name;
this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime;
this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition;
this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig;
this.suppressTombstones = suppressTombstones;
this.safeToDropTombstones = safeToDropTombstones;
}

@Override
public Suppressed<K> withName(final String name) {
return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, suppressTombstones);
return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones);
}

public String name() {
Expand All @@ -65,16 +78,20 @@ Duration timeToWaitForMoreEvents() {
return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents;
}

boolean shouldSuppressTombstones() {
return suppressTombstones;
boolean safeToDropTombstones() {
return safeToDropTombstones;
}

@Override
public boolean equals(final Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SuppressedInternal<?> that = (SuppressedInternal<?>) o;
return suppressTombstones == that.suppressTombstones &&
return safeToDropTombstones == that.safeToDropTombstones &&
Objects.equals(name, that.name) &&
Objects.equals(bufferConfig, that.bufferConfig) &&
Objects.equals(timeToWaitForMoreEvents, that.timeToWaitForMoreEvents) &&
Expand All @@ -83,7 +100,7 @@ public boolean equals(final Object o) {

@Override
public int hashCode() {
return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, suppressTombstones);
return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, safeToDropTombstones);
}

@Override
Expand All @@ -92,7 +109,7 @@ public String toString() {
", bufferConfig=" + bufferConfig +
", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents +
", timeDefinition=" + timeDefinition +
", suppressTombstones=" + suppressTombstones +
", safeToDropTombstones=" + safeToDropTombstones +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,4 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin
public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) {
throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context.");
}

@Override
public long streamTime() {
throw new RuntimeException("Stream time is not implemented for the global processor context.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,4 @@ public interface InternalProcessorContext extends ProcessorContext {
* Mark this context as being uninitialized
*/
void uninitialize();

long streamTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re

private final StreamTask task;
private final RecordCollector collector;
private TimestampSupplier streamTimeSupplier;
private final ToInternal toInternal = new ToInternal();
private final static To SEND_TO_ALL = To.all();

Expand Down Expand Up @@ -165,13 +164,4 @@ public Cancellable schedule(final Duration interval,
return schedule(interval.toMillis(), type, callback);
}

void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) {
this.streamTimeSupplier = streamTimeSupplier;
}

@Override
public long streamTime() {
return streamTimeSupplier.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
public class RecordQueue {

static final long UNKNOWN = -1L;
static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP;

private final Logger log;
private final SourceNode source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ public Map<TopicPartition, Long> offsets() {
}
};

private long streamTime = RecordQueue.UNKNOWN;

StandbyContextImpl(final TaskId id,
final StreamsConfig config,
final ProcessorStateManager stateMgr,
Expand Down Expand Up @@ -231,14 +229,4 @@ public void setCurrentNode(final ProcessorNode currentNode) {
public ProcessorNode currentNode() {
throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks.");
}

void updateStreamTime(final long streamTime) {
this.streamTime = Math.max(this.streamTime, streamTime);
}

@Override
public long streamTime() {
return streamTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
public class StandbyTask extends AbstractTask {

private Map<TopicPartition, Long> checkpointedOffsets = new HashMap<>();
private final StandbyContextImpl standbyContext;

/**
* Create {@link StandbyTask} with its assigned partitions
Expand All @@ -60,7 +59,7 @@ public class StandbyTask extends AbstractTask {
final StateDirectory stateDirectory) {
super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config);

processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics);
processorContext = new StandbyContextImpl(id, config, stateMgr, metrics);
}

@Override
Expand Down Expand Up @@ -120,7 +119,7 @@ public void suspend() {

private void flushAndCheckpointState() {
stateMgr.flush();
stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
stateMgr.checkpoint(Collections.emptyMap());
}

/**
Expand Down Expand Up @@ -177,9 +176,6 @@ public List<ConsumerRecord<byte[], byte[]>> update(final TopicPartition partitio
if (record.offset() < limit) {
restoreRecords.add(record);
lastOffset = record.offset();
// ideally, we'd use the stream time at the time of the change logging, but we'll settle for
// record timestamp for now.
standbyContext.updateStreamTime(record.timestamp());
} else {
remainingRecords.add(record);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,6 @@ public StreamTask(final TaskId id,

recordInfo = new PartitionGroup.RecordInfo();
partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl));
processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);

stateMgr.registerGlobalStateStores(topology.globalStateStores());

Expand Down

This file was deleted.

Loading