diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index b89399bf7b7bf..f7802d66ad7a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -82,6 +83,7 @@ private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor, Agg>> merged = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index f29251573e2de..c5b2483847582 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -75,6 +76,7 @@ private class KStreamWindowAggregateProcessor extends AbstractProcessor { private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @Override @@ -103,7 +105,8 @@ value, context().topic(), context().partition(), context().offset() // first get the matching windows final long timestamp = context().timestamp(); - final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs(); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); final Map matchedWindows = windows.windowsFor(timestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 50e74a38fd671..4058083dc1bdc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -40,14 +41,17 @@ public class KTableSuppressProcessor implements Processor> { private final long suppressDurationMillis; private final TimeDefinition bufferTimeDefinition; private final BufferFullStrategy bufferFullStrategy; - private final boolean shouldSuppressTombstones; + private final boolean safeToDropTombstones; private final String storeName; + private TimeOrderedKeyValueBuffer buffer; private InternalProcessorContext internalProcessorContext; private Serde keySerde; private FullChangeSerde valueSerde; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + public KTableSuppressProcessor(final SuppressedInternal suppress, final String storeName, final Serde keySerde, @@ -61,7 +65,7 @@ public KTableSuppressProcessor(final SuppressedInternal suppress, suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis(); bufferTimeDefinition = suppress.timeDefinition(); bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy(); - shouldSuppressTombstones = suppress.shouldSuppressTombstones(); + safeToDropTombstones = suppress.safeToDropTombstones(); } @SuppressWarnings("unchecked") @@ -75,6 +79,7 @@ public void init(final ProcessorContext context) { @Override public void process(final K key, final Change value) { + observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp()); buffer(key, value); enforceConstraints(); } @@ -90,7 +95,7 @@ private void buffer(final K key, final Change value) { } private void enforceConstraints() { - final long streamTime = internalProcessorContext.streamTime(); + final long streamTime = observedStreamTime; final long expiryTime = streamTime - suppressDurationMillis; buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit); @@ -130,7 +135,7 @@ private void emit(final KeyValue toEmit) { } private boolean shouldForward(final Change value) { - return !(value.newValue == null && shouldSuppressTombstones); + return value.newValue != null || !safeToDropTombstones; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index 74534756b94b4..c3877007c5a42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -30,23 +30,36 @@ public class SuppressedInternal implements Suppressed { private final BufferConfigInternal bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition timeDefinition; - private final boolean suppressTombstones; + private final boolean safeToDropTombstones; + /** + * @param safeToDropTombstones Note: it's *only* safe to drop tombstones for windowed KTables in "final results" mode. + * In that case, we have a priori knowledge that we have never before emitted any + * results for a given key, and therefore the tombstone is unnecessary (albeit + * idempotent and correct). We decided that the unnecessary tombstones would not be + * desirable in the output stream, though, hence the ability to drop them. + * + * A alternative is to remember whether a result has previously been emitted + * for a key and drop tombstones in that case, but it would be a little complicated to + * figure out when to forget the fact that we have emitted some result (currently, the + * buffer immediately forgets all about a key when we emit, which helps to keep it + * compact). + */ public SuppressedInternal(final String name, final Duration suppressionTime, final BufferConfig bufferConfig, final TimeDefinition timeDefinition, - final boolean suppressTombstones) { + final boolean safeToDropTombstones) { this.name = name; this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition; this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig; - this.suppressTombstones = suppressTombstones; + this.safeToDropTombstones = safeToDropTombstones; } @Override public Suppressed withName(final String name) { - return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, suppressTombstones); + return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones); } public String name() { @@ -65,16 +78,20 @@ Duration timeToWaitForMoreEvents() { return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; } - boolean shouldSuppressTombstones() { - return suppressTombstones; + boolean safeToDropTombstones() { + return safeToDropTombstones; } @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } final SuppressedInternal that = (SuppressedInternal) o; - return suppressTombstones == that.suppressTombstones && + return safeToDropTombstones == that.safeToDropTombstones && Objects.equals(name, that.name) && Objects.equals(bufferConfig, that.bufferConfig) && Objects.equals(timeToWaitForMoreEvents, that.timeToWaitForMoreEvents) && @@ -83,7 +100,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, suppressTombstones); + return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, safeToDropTombstones); } @Override @@ -92,7 +109,7 @@ public String toString() { ", bufferConfig=" + bufferConfig + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + ", timeDefinition=" + timeDefinition + - ", suppressTombstones=" + suppressTombstones + + ", safeToDropTombstones=" + safeToDropTombstones + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 5c5b84f155d67..a39fafdea6e9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -105,9 +105,4 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); } - - @Override - public long streamTime() { - throw new RuntimeException("Stream time is not implemented for the global processor context."); - } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 98511fd53dec2..0f67dff9fa22c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -63,6 +63,4 @@ public interface InternalProcessorContext extends ProcessorContext { * Mark this context as being uninitialized */ void uninitialize(); - - long streamTime(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 21e1c17a8985f..0ed82decc4d83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -35,7 +35,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re private final StreamTask task; private final RecordCollector collector; - private TimestampSupplier streamTimeSupplier; private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); @@ -165,13 +164,4 @@ public Cancellable schedule(final Duration interval, return schedule(interval.toMillis(), type, callback); } - void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) { - this.streamTimeSupplier = streamTimeSupplier; - } - - @Override - public long streamTime() { - return streamTimeSupplier.get(); - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index d06d7f3bad654..572e6292ea29e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -36,7 +36,7 @@ */ public class RecordQueue { - static final long UNKNOWN = -1L; + static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP; private final Logger log; private final SourceNode source; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 6b835d97a3fb1..ee693739dc7aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -75,8 +75,6 @@ public Map offsets() { } }; - private long streamTime = RecordQueue.UNKNOWN; - StandbyContextImpl(final TaskId id, final StreamsConfig config, final ProcessorStateManager stateMgr, @@ -231,14 +229,4 @@ public void setCurrentNode(final ProcessorNode currentNode) { public ProcessorNode currentNode() { throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks."); } - - void updateStreamTime(final long streamTime) { - this.streamTime = Math.max(this.streamTime, streamTime); - } - - @Override - public long streamTime() { - return streamTime; - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 45f06b2bec563..bd8ba34cb151b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -37,7 +37,6 @@ public class StandbyTask extends AbstractTask { private Map checkpointedOffsets = new HashMap<>(); - private final StandbyContextImpl standbyContext; /** * Create {@link StandbyTask} with its assigned partitions @@ -60,7 +59,7 @@ public class StandbyTask extends AbstractTask { final StateDirectory stateDirectory) { super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config); - processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics); + processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); } @Override @@ -120,7 +119,7 @@ public void suspend() { private void flushAndCheckpointState() { stateMgr.flush(); - stateMgr.checkpoint(Collections.emptyMap()); + stateMgr.checkpoint(Collections.emptyMap()); } /** @@ -177,9 +176,6 @@ public List> update(final TopicPartition partitio if (record.offset() < limit) { restoreRecords.add(record); lastOffset = record.offset(); - // ideally, we'd use the stream time at the time of the change logging, but we'll settle for - // record timestamp for now. - standbyContext.updateStreamTime(record.timestamp()); } else { remainingRecords.add(record); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 247a156eb3809..c4fecef4285ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -236,7 +236,6 @@ public StreamTask(final TaskId id, recordInfo = new PartitionGroup.RecordInfo(); partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl)); - processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp); stateMgr.registerGlobalStateStores(topology.globalStateStores()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java deleted file mode 100644 index a6a7a42674e29..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -public interface TimestampSupplier { - long get(); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 17079b964b505..c4fce72d78094 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; @@ -51,6 +52,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { private volatile boolean open; private Set bulkLoadSegments; private Sensor expiredRecordSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; RocksDBSegmentedBytesStore(final String name, final String metricScope, @@ -108,7 +110,9 @@ public KeyValueIterator fetchAll(final long timeFrom, final long @Override public void remove(final Bytes key) { - final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final long timestamp = keySchema.segmentTimestamp(key); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final Segment segment = segments.getSegmentForTimestamp(timestamp); if (segment == null) { return; } @@ -118,8 +122,9 @@ public void remove(final Bytes key) { @Override public void put(final Bytes key, final byte[] value) { final long timestamp = keySchema.segmentTimestamp(key); + observedStreamTime = Math.max(observedStreamTime, timestamp); final long segmentId = segments.segmentId(timestamp); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment == null) { expiredRecordSensor.record(); LOG.debug("Skipping record for expired segment."); @@ -163,7 +168,7 @@ public void init(final ProcessorContext context, final StateStore root) { "expired-window-record-drop" ); - segments.openExisting(this.context); + segments.openExisting(this.context, observedStreamTime); bulkLoadSegments = new HashSet<>(segments.allSegments()); @@ -215,10 +220,17 @@ void restoreAllInternal(final Collection> records) { // Visible for testing Map getWriteBatches(final Collection> records) { + // advance stream time to the max timestamp in the batch + for (final KeyValue record : records) { + final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key)); + observedStreamTime = Math.max(observedStreamTime, timestamp); + } + final Map writeBatchMap = new HashMap<>(); for (final KeyValue record : records) { - final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key))); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key)); + final long segmentId = segments.segmentId(timestamp); + final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading @@ -247,7 +259,7 @@ Map getWriteBatches(final Collection value = ARBITRARY_CHANGE; processor.process(key, value); @@ -117,7 +116,6 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() { final long timestamp = ARBITRARY_LONG; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -137,14 +135,12 @@ public void intermediateSuppressionShouldBufferAndEmitLater() { final long timestamp = 0L; context.setRecordMetadata("topic", 0, 0, null, timestamp); - context.setStreamTime(timestamp); final String key = "hey"; final Change value = new Change<>(null, 1L); processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); context.setRecordMetadata("topic", 0, 1, null, 1L); - context.setStreamTime(1L); processor.process("tick", new Change<>(null, null)); assertThat(context.forwarded(), hasSize(1)); @@ -164,7 +160,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("topic", 0, 0, null, recordTime); - context.setStreamTime(recordTime); final Windowed key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -176,7 +171,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime2 = 100L; final long windowEnd2 = 101L; context.setRecordMetadata("topic", 0, 1, null, recordTime2); - context.setStreamTime(recordTime2); processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(0)); @@ -185,7 +179,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime3 = 101L; final long windowEnd3 = 102L; context.setRecordMetadata("topic", 0, 1, null, recordTime3); - context.setStreamTime(recordTime3); processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); @@ -212,14 +205,12 @@ public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { final long streamTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(streamTime); final Windowed key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); context.setRecordMetadata("", 0, 1L, null, windowEnd); - context.setStreamTime(windowEnd); processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); @@ -237,7 +228,6 @@ public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -257,7 +247,6 @@ public void finalResultsShouldSuppressTombstonesForTimeWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -274,7 +263,6 @@ public void finalResultsShouldSuppressTombstonesForSessionWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -291,7 +279,6 @@ public void suppressShouldNotSuppressTombstonesForTimeWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -311,7 +298,6 @@ public void suppressShouldNotSuppressTombstonesForSessionWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -331,7 +317,6 @@ public void suppressShouldNotSuppressTombstonesForKTable() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -350,7 +335,6 @@ public void suppressShouldEmitWhenOverRecordCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); @@ -373,7 +357,6 @@ public void suppressShouldEmitWhenOverByteCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); @@ -396,7 +379,6 @@ public void suppressShouldShutDownWhenOverRecordCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); context.setCurrentNode(new ProcessorNode("testNode")); final String key = "hey"; @@ -420,7 +402,6 @@ public void suppressShouldShutDownWhenOverByteCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); context.setCurrentNode(new ProcessorNode("testNode")); final String key = "hey"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 4ce9a9f4f177c..43df1d210239d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -224,10 +224,5 @@ public void forward(final K key, final V value, final String childName) { @Override public void commit() {} - - @Override - public long streamTime() { - throw new RuntimeException("not implemented"); - } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index ee0069a4072c0..0ee71887f6c50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -81,6 +81,7 @@ public class RocksDBSegmentedBytesStoreTest { private RocksDBSegmentedBytesStore bytesStore; private File stateDir; private final Window[] windows = new Window[4]; + private Window nextSegmentWindow; @Parameter public SegmentedBytesStore.KeySchema schema; @@ -98,12 +99,22 @@ public void before() { windows[1] = new SessionWindow(500L, 1000L); windows[2] = new SessionWindow(1_000L, 1_500L); windows[3] = new SessionWindow(30_000L, 60_000L); + // All four of the previous windows will go into segment 1. + // The nextSegmentWindow is computed be a high enough time that when it gets written + // to the segment store, it will advance stream time past the first segment's retention time and + // expire it. + nextSegmentWindow = new SessionWindow(segmentInterval + retention, segmentInterval + retention); } if (schema instanceof WindowKeySchema) { windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow); windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow); windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow); windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow); + // All four of the previous windows will go into segment 1. + // The nextSegmentWindow is computed be a high enough time that when it gets written + // to the segment store, it will advance stream time past the first segment's retention time and + // expire it. + nextSegmentWindow = timeWindowForSize(segmentInterval + retention, windowSizeForTimeWindow); } @@ -415,8 +426,13 @@ public void shouldLogAndMeasureExpiredRecords() { LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - context.setStreamTime(Math.max(retention, segmentInterval) * 2); - bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5)); + // write a record to advance stream time, with a high enough timestamp + // that the subsequent record in windows[0] will already be expired. + bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); + + final Bytes key = serializeKey(new Windowed<>("a", windows[0])); + final byte[] value = serializeValue(5); + bytesStore.put(key, value); LogCaptureAppender.unregister(appender); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index c41b094230122..588f12f7b2a72 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -159,7 +159,6 @@ public void shouldOnlyIterateOpenSegments() { private void setCurrentTime(final long currentTime) { context.setRecordContext(createRecordContext(currentTime)); - context.setStreamTime(currentTime); } private ProcessorRecordContext createRecordContext(final long time) { @@ -727,9 +726,11 @@ public void testInitialLoading() { new File(storeDir, segments.segmentName(6L)).mkdir(); windowStore.close(); - context.setStreamTime(segmentInterval * 6L); windowStore = createWindowStore(context, false); + // put something in the store to advance its stream time and expire the old segments + windowStore.put(1, "v", 6L * segmentInterval); + final List expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)); expected.sort(String::compareTo); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index efed24f49e9fe..0c16457b229bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -94,9 +94,9 @@ public void shouldGetSegmentNameFromId() { @Test public void shouldCreateSegments() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context); + final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L); assertTrue(new File(context.stateDir(), "test/test.0").isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory()); @@ -107,17 +107,16 @@ public void shouldCreateSegments() { @Test public void shouldNotCreateSegmentThatIsAlreadyExpired() { - updateStreamTimeAndCreateSegment(7); - assertNull(segments.getOrCreateSegmentIfLive(0, context)); + final long streamTime = updateStreamTimeAndCreateSegment(7); + assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime)); assertFalse(new File(context.stateDir(), "test/test.0").exists()); } @Test public void shouldCleanupSegmentsThatHaveExpired() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - context.setStreamTime(SEGMENT_INTERVAL * 7); - final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context); + final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L); assertFalse(segment1.isOpen()); assertFalse(segment2.isOpen()); assertTrue(segment3.isOpen()); @@ -128,22 +127,22 @@ public void shouldCleanupSegmentsThatHaveExpired() { @Test public void shouldGetSegmentForTimestamp() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); + final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); + segments.getOrCreateSegmentIfLive(1, context, -1L); assertEquals(segment, segments.getSegmentForTimestamp(0L)); } @Test public void shouldGetCorrectSegmentString() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); + final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); assertEquals("Segment(id=0, name=test.0)", segment.toString()); } @Test public void shouldCloseAllOpenSegments() { - final Segment first = segments.getOrCreateSegmentIfLive(0, context); - final Segment second = segments.getOrCreateSegmentIfLive(1, context); - final Segment third = segments.getOrCreateSegmentIfLive(2, context); + final Segment first = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment second = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment third = segments.getOrCreateSegmentIfLive(2, context, -1L); segments.close(); assertFalse(first.isOpen()); @@ -154,16 +153,16 @@ public void shouldCloseAllOpenSegments() { @Test public void shouldOpenExistingSegments() { segments = new Segments("test", 4, 1); - segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); - segments.getOrCreateSegmentIfLive(2, context); - segments.getOrCreateSegmentIfLive(3, context); - segments.getOrCreateSegmentIfLive(4, context); + segments.getOrCreateSegmentIfLive(0, context, -1L); + segments.getOrCreateSegmentIfLive(1, context, -1L); + segments.getOrCreateSegmentIfLive(2, context, -1L); + segments.getOrCreateSegmentIfLive(3, context, -1L); + segments.getOrCreateSegmentIfLive(4, context, -1L); // close existing. segments.close(); segments = new Segments("test", 4, 1); - segments.openExisting(context); + segments.openExisting(context, -1L); assertTrue(segments.getSegmentForTimestamp(0).isOpen()); assertTrue(segments.getSegmentForTimestamp(1).isOpen()); @@ -178,12 +177,12 @@ public void shouldGetSegmentsWithinTimeRange() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(2); updateStreamTimeAndCreateSegment(3); - updateStreamTimeAndCreateSegment(4); - segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); - segments.getOrCreateSegmentIfLive(2, context); - segments.getOrCreateSegmentIfLive(3, context); - segments.getOrCreateSegmentIfLive(4, context); + final long streamTime = updateStreamTimeAndCreateSegment(4); + segments.getOrCreateSegmentIfLive(0, context, streamTime); + segments.getOrCreateSegmentIfLive(1, context, streamTime); + segments.getOrCreateSegmentIfLive(2, context, streamTime); + segments.getOrCreateSegmentIfLive(3, context, streamTime); + segments.getOrCreateSegmentIfLive(4, context, streamTime); final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); @@ -235,17 +234,18 @@ public void futureEventsShouldNotCauseSegmentRoll() { verifyCorrectSegments(0, 3); updateStreamTimeAndCreateSegment(3); verifyCorrectSegments(0, 4); - updateStreamTimeAndCreateSegment(4); + final long streamTime = updateStreamTimeAndCreateSegment(4); verifyCorrectSegments(0, 5); - segments.getOrCreateSegmentIfLive(5, context); + segments.getOrCreateSegmentIfLive(5, context, streamTime); verifyCorrectSegments(0, 6); - segments.getOrCreateSegmentIfLive(6, context); + segments.getOrCreateSegmentIfLive(6, context, streamTime); verifyCorrectSegments(0, 7); } - private void updateStreamTimeAndCreateSegment(final int segment) { - context.setStreamTime(SEGMENT_INTERVAL * segment); - segments.getOrCreateSegmentIfLive(segment, context); + private long updateStreamTimeAndCreateSegment(final int segment) { + final long streamTime = SEGMENT_INTERVAL * segment; + segments.getOrCreateSegmentIfLive(segment, context, streamTime); + return streamTime; } @Test @@ -268,7 +268,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc oldSegment.createNewFile(); } - segments.openExisting(context); + segments.openExisting(context, -1L); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final String segmentName = storeName + "." + (long) segmentId * segmentInterval; @@ -290,7 +290,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex oldSegment.createNewFile(); } - segments.openExisting(context); + segments.openExisting(context, -1L); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); @@ -300,7 +300,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex @Test public void shouldClearSegmentsOnClose() { - segments.getOrCreateSegmentIfLive(0, context); + segments.getOrCreateSegmentIfLive(0, context, -1L); segments.close(); assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index ddff7a892e2e1..d70c7b90be7ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -16,30 +16,28 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; import java.util.Properties; public class SmokeTestClient extends SmokeTestUtil { @@ -115,85 +113,76 @@ private static KafkaStreams createKafkaStreams(final Properties props) { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); final KStream source = builder.stream("data", stringIntConsumed); - source.to("echo", Produced.with(stringSerde, intSerde)); - final KStream data = source.filter(new Predicate() { - @Override - public boolean test(final String key, final Integer value) { - return value == null || value != END; - } - }); + source.filterNot((k, v) -> k.equals("flush")) + .to("echo", Produced.with(stringSerde, intSerde)); + final KStream data = source.filter((key, value) -> value == null || value != END); data.process(SmokeTestUtil.printProcessorSupplier("data")); // min final KGroupedStream groupedData = data.groupByKey(Serialized.with(stringSerde, intSerde)); - groupedData - .windowedBy(TimeWindows.of(Duration.ofDays(1))) + final KTable, Integer> minAggregation = groupedData + .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, - Materialized.>as("uwin-min").withValueSerde(intSerde)) - .toStream(new Unwindow()) + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, + Materialized + .>as("uwin-min") + .withValueSerde(intSerde) + .withRetention(Duration.ofHours(25)) + ); + + minAggregation + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("min-raw", Produced.with(stringSerde, intSerde)); + + minAggregation + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("min-suppressed", Produced.with(stringSerde, intSerde)); + + minAggregation + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("min", Produced.with(stringSerde, intSerde)); final KTable minTable = builder.table( "min", Consumed.with(stringSerde, intSerde), - Materialized.>as("minStoreName")); + Materialized.as("minStoreName")); minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); // max groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, Materialized.>as("uwin-max").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("max", Produced.with(stringSerde, intSerde)); final KTable maxTable = builder.table( "max", Consumed.with(stringSerde, intSerde), - Materialized.>as("maxStoreName")); + Materialized.as("maxStoreName")); maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); // sum groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, final Integer value, final Long aggregate) { - return (long) value + aggregate; - } - }, + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, Materialized.>as("win-sum").withValueSerde(longSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("sum", Produced.with(stringSerde, longSerde)); final Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde); @@ -203,38 +192,33 @@ public Long apply(final String aggKey, final Integer value, final Long aggregate // cnt groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) - .count(Materialized.>as("uwin-cnt")) - .toStream(new Unwindow()) + .count(Materialized.as("uwin-cnt")) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("cnt", Produced.with(stringSerde, longSerde)); final KTable cntTable = builder.table( "cnt", Consumed.with(stringSerde, longSerde), - Materialized.>as("cntStoreName")); + Materialized.as("cntStoreName")); cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); // dif maxTable .join( minTable, - new ValueJoiner() { - public Integer apply(final Integer value1, final Integer value2) { - return value1 - value2; - } - }) + (value1, value2) -> value1 - value2) .toStream() + .filterNot((k, v) -> k.equals("flush")) .to("dif", Produced.with(stringSerde, intSerde)); // avg sumTable .join( cntTable, - new ValueJoiner() { - public Double apply(final Long value1, final Long value2) { - return (double) value1 / (double) value2; - } - }) + (value1, value2) -> (double) value1 / (double) value2) .toStream() + .filterNot((k, v) -> k.equals("flush")) .to("avg", Produced.with(stringSerde, doubleSerde)); // test repartition diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 7087298d86a57..aef7fa7856952 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -214,6 +215,19 @@ public static Map> generate(final String kafka, } } + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor("data"); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + )); + } + producer.close(); return Collections.unmodifiableMap(allData); } @@ -262,7 +276,7 @@ public static void verify(final String kafka, final Map> al props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final List partitions = getAllPartitions(consumer, "echo", "max", "min", "min-suppressed", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); consumer.assign(partitions); consumer.seekToBeginning(partitions); @@ -271,6 +285,7 @@ public static void verify(final String kafka, final Map> al final HashMap max = new HashMap<>(); final HashMap min = new HashMap<>(); + final HashMap> minSuppressed = new HashMap<>(); final HashMap dif = new HashMap<>(); final HashMap sum = new HashMap<>(); final HashMap cnt = new HashMap<>(); @@ -290,6 +305,7 @@ public static void verify(final String kafka, final Map> al final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) + && verifyMinSuppressed(minSuppressed, allData, false) && verifyMax(max, allData, false) && verifyDif(dif, allData, false) && verifySum(sum, allData, false) @@ -316,6 +332,10 @@ && verifyTAgg(tagg, allData, false)) { case "min": min.put(key, intSerde.deserializer().deserialize("", record.value())); break; + case "min-suppressed": + minSuppressed.computeIfAbsent(key, k -> new LinkedList<>()) + .add(intSerde.deserializer().deserialize("", record.value())); + break; case "max": max.put(key, intSerde.deserializer().deserialize("", record.value())); break; @@ -372,6 +392,7 @@ && verifyTAgg(tagg, allData, false)) { } success &= verifyMin(min, allData, true); + success &= verifyMinSuppressed(minSuppressed, allData, true); success &= verifyMax(max, allData, true); success &= verifyDif(dif, allData, true); success &= verifySum(sum, allData, true); @@ -412,6 +433,41 @@ private static boolean verifyMin(final Map map, final Map> map, + final Map> allData, + final boolean print) { + if (map.isEmpty()) { + maybePrint(print, "min-suppressed is empty"); + return false; + } else { + maybePrint(print, "verifying min-suppressed"); + + if (map.size() != allData.size()) { + maybePrint(print, "fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + return false; + } + for (final Map.Entry> entry : map.entrySet()) { + final String key = entry.getKey(); + final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", ""); + final int expected = getMin(unwindowedKey); + if (entry.getValue().size() != 1) { + maybePrint(print, "fail: key=" + entry.getKey() + " non-unique value: " + entry.getValue()); + return false; + } else if (expected != entry.getValue().get(0)) { + maybePrint(print, "fail: key=" + entry.getKey() + " min=" + entry.getValue().get(0) + " expected=" + expected); + return false; + } + } + } + return true; + } + + private static void maybePrint(final boolean print, final String s) { + if (print) { + System.out.println(s); + } + } + private static boolean verifyMax(final Map map, final Map> allData, final boolean print) { if (map.isEmpty()) { if (print) { diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 2f356bff3a133..f483adfa79d1c 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.test; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -33,10 +32,10 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; -import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.CompositeRestoreListener; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -44,6 +43,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,7 +64,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple private Serde keySerde; private Serde valSerde; private long timestamp = -1L; - private long streamTime = -1L; public InternalMockProcessorContext() { this(null, @@ -175,15 +174,6 @@ public Serde valueSerde() { @Override public void initialize() {} - public void setStreamTime(final long currentTime) { - streamTime = currentTime; - } - - @Override - public long streamTime() { - return streamTime; - } - @Override public File stateDir() { if (stateDir == null) { diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 14f8561030fca..62a8491626594 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -25,7 +25,6 @@ public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext { private ProcessorNode currentNode; - private long streamTime; @Override public StreamsMetricsImpl metrics() { @@ -72,13 +71,4 @@ public void initialize() { public void uninitialize() { } - - @Override - public long streamTime() { - return streamTime; - } - - public void setStreamTime(final long streamTime) { - this.streamTime = streamTime; - } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 36d049c58bad1..650340881606f 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -94,11 +94,6 @@ public void initialize() { initialized = true; } - @Override - public long streamTime() { - throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext"); - } - @Override public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) {