From f34ef1596486260d003439ef7571064ff27cc266 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Wed, 20 Feb 2019 09:15:57 -0600 Subject: [PATCH 1/3] KAFKA-7895: fix stream-time reckoning for Suppress (2.2) (#6286) Even within a Task, different Processors have different perceptions of time, due to record caching on stores and in suppression itself, and in general, due to any processor logic that may hold onto records arbitrarily and emit them later. Thanks to this, we can't rely on the whole task existing in the same "instant" of stream-time. The solution is for each processor node that cares about stream-time to track it independently. --- .../KStreamSessionWindowAggregate.java | 5 +- .../internals/KStreamWindowAggregate.java | 5 +- .../suppress/KTableSuppressProcessor.java | 14 ++- .../suppress/SuppressedInternal.java | 39 ++++-- .../internals/GlobalProcessorContextImpl.java | 7 +- .../internals/InternalProcessorContext.java | 2 - .../internals/ProcessorContextImpl.java | 10 -- .../processor/internals/RecordQueue.java | 2 +- .../internals/StandbyContextImpl.java | 12 -- .../processor/internals/StandbyTask.java | 8 +- .../processor/internals/StreamTask.java | 1 - .../internals/TimestampSupplier.java | 21 ---- .../state/internals/AbstractSegments.java | 0 .../internals/RocksDBSegmentedBytesStore.java | 24 +++- .../streams/state/internals/Segments.java | 8 +- ...amSessionWindowAggregateProcessorTest.java | 6 +- .../KTableSuppressProcessorMetricsTest.java | 0 .../suppress/KTableSuppressProcessorTest.java | 19 --- .../AbstractProcessorContextTest.java | 5 - .../GlobalProcessorContextImplTest.java | 0 .../RocksDBSegmentedBytesStoreTest.java | 20 ++- .../internals/RocksDBWindowStoreTest.java | 5 +- .../streams/state/internals/SegmentsTest.java | 72 +++++------ .../kafka/streams/tests/SmokeTestClient.java | 119 ++++++++---------- .../kafka/streams/tests/SmokeTestDriver.java | 58 ++++++++- .../test/InternalMockProcessorContext.java | 14 +-- .../test/MockInternalProcessorContext.java | 10 -- .../kafka/test/NoOpProcessorContext.java | 5 - 28 files changed, 246 insertions(+), 245 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index b89399bf7b7bf..f7802d66ad7a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.ProcessorStateException; @@ -82,6 +83,7 @@ private class KStreamSessionWindowAggregateProcessor extends AbstractProcessor, Agg>> merged = new ArrayList<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index f29251573e2de..c5b2483847582 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; @@ -75,6 +76,7 @@ private class KStreamWindowAggregateProcessor extends AbstractProcessor { private StreamsMetricsImpl metrics; private InternalProcessorContext internalProcessorContext; private Sensor lateRecordDropSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; @SuppressWarnings("unchecked") @Override @@ -103,7 +105,8 @@ value, context().topic(), context().partition(), context().offset() // first get the matching windows final long timestamp = context().timestamp(); - final long closeTime = internalProcessorContext.streamTime() - windows.gracePeriodMs(); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final long closeTime = observedStreamTime - windows.gracePeriodMs(); final Map matchedWindows = windows.windowsFor(timestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 50e74a38fd671..8cf5025c6a6b6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; @@ -40,14 +42,17 @@ public class KTableSuppressProcessor implements Processor> { private final long suppressDurationMillis; private final TimeDefinition bufferTimeDefinition; private final BufferFullStrategy bufferFullStrategy; - private final boolean shouldSuppressTombstones; + private final boolean safeToDropTombstones; private final String storeName; + private TimeOrderedKeyValueBuffer buffer; private InternalProcessorContext internalProcessorContext; private Serde keySerde; private FullChangeSerde valueSerde; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; + public KTableSuppressProcessor(final SuppressedInternal suppress, final String storeName, final Serde keySerde, @@ -61,7 +66,7 @@ public KTableSuppressProcessor(final SuppressedInternal suppress, suppressDurationMillis = suppress.timeToWaitForMoreEvents().toMillis(); bufferTimeDefinition = suppress.timeDefinition(); bufferFullStrategy = suppress.bufferConfig().bufferFullStrategy(); - shouldSuppressTombstones = suppress.shouldSuppressTombstones(); + safeToDropTombstones = suppress.safeToDropTombstones(); } @SuppressWarnings("unchecked") @@ -75,6 +80,7 @@ public void init(final ProcessorContext context) { @Override public void process(final K key, final Change value) { + observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp()); buffer(key, value); enforceConstraints(); } @@ -90,7 +96,7 @@ private void buffer(final K key, final Change value) { } private void enforceConstraints() { - final long streamTime = internalProcessorContext.streamTime(); + final long streamTime = observedStreamTime; final long expiryTime = streamTime - suppressDurationMillis; buffer.evictWhile(() -> buffer.minTimestamp() <= expiryTime, this::emit); @@ -130,7 +136,7 @@ private void emit(final KeyValue toEmit) { } private boolean shouldForward(final Change value) { - return !(value.newValue == null && shouldSuppressTombstones); + return value.newValue != null || !safeToDropTombstones; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java index 74534756b94b4..c3877007c5a42 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedInternal.java @@ -30,23 +30,36 @@ public class SuppressedInternal implements Suppressed { private final BufferConfigInternal bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition timeDefinition; - private final boolean suppressTombstones; + private final boolean safeToDropTombstones; + /** + * @param safeToDropTombstones Note: it's *only* safe to drop tombstones for windowed KTables in "final results" mode. + * In that case, we have a priori knowledge that we have never before emitted any + * results for a given key, and therefore the tombstone is unnecessary (albeit + * idempotent and correct). We decided that the unnecessary tombstones would not be + * desirable in the output stream, though, hence the ability to drop them. + * + * A alternative is to remember whether a result has previously been emitted + * for a key and drop tombstones in that case, but it would be a little complicated to + * figure out when to forget the fact that we have emitted some result (currently, the + * buffer immediately forgets all about a key when we emit, which helps to keep it + * compact). + */ public SuppressedInternal(final String name, final Duration suppressionTime, final BufferConfig bufferConfig, final TimeDefinition timeDefinition, - final boolean suppressTombstones) { + final boolean safeToDropTombstones) { this.name = name; this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? TimeDefinitions.RecordTimeDefintion.instance() : timeDefinition; this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : (BufferConfigInternal) bufferConfig; - this.suppressTombstones = suppressTombstones; + this.safeToDropTombstones = safeToDropTombstones; } @Override public Suppressed withName(final String name) { - return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, suppressTombstones); + return new SuppressedInternal<>(name, timeToWaitForMoreEvents, bufferConfig, timeDefinition, safeToDropTombstones); } public String name() { @@ -65,16 +78,20 @@ Duration timeToWaitForMoreEvents() { return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; } - boolean shouldSuppressTombstones() { - return suppressTombstones; + boolean safeToDropTombstones() { + return safeToDropTombstones; } @Override public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } final SuppressedInternal that = (SuppressedInternal) o; - return suppressTombstones == that.suppressTombstones && + return safeToDropTombstones == that.safeToDropTombstones && Objects.equals(name, that.name) && Objects.equals(bufferConfig, that.bufferConfig) && Objects.equals(timeToWaitForMoreEvents, that.timeToWaitForMoreEvents) && @@ -83,7 +100,7 @@ public boolean equals(final Object o) { @Override public int hashCode() { - return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, suppressTombstones); + return Objects.hash(name, bufferConfig, timeToWaitForMoreEvents, timeDefinition, safeToDropTombstones); } @Override @@ -92,7 +109,7 @@ public String toString() { ", bufferConfig=" + bufferConfig + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + ", timeDefinition=" + timeDefinition + - ", suppressTombstones=" + suppressTombstones + + ", safeToDropTombstones=" + safeToDropTombstones + '}'; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java index 5c5b84f155d67..a39fafdea6e9a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java @@ -105,9 +105,4 @@ public Cancellable schedule(final long interval, final PunctuationType type, fin public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { throw new UnsupportedOperationException("this should not happen: schedule() not supported in global processor context."); } - - @Override - public long streamTime() { - throw new RuntimeException("Stream time is not implemented for the global processor context."); - } -} +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 98511fd53dec2..0f67dff9fa22c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -63,6 +63,4 @@ public interface InternalProcessorContext extends ProcessorContext { * Mark this context as being uninitialized */ void uninitialize(); - - long streamTime(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 21e1c17a8985f..0ed82decc4d83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -35,7 +35,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements Re private final StreamTask task; private final RecordCollector collector; - private TimestampSupplier streamTimeSupplier; private final ToInternal toInternal = new ToInternal(); private final static To SEND_TO_ALL = To.all(); @@ -165,13 +164,4 @@ public Cancellable schedule(final Duration interval, return schedule(interval.toMillis(), type, callback); } - void setStreamTimeSupplier(final TimestampSupplier streamTimeSupplier) { - this.streamTimeSupplier = streamTimeSupplier; - } - - @Override - public long streamTime() { - return streamTimeSupplier.get(); - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index d06d7f3bad654..572e6292ea29e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -36,7 +36,7 @@ */ public class RecordQueue { - static final long UNKNOWN = -1L; + static final long UNKNOWN = ConsumerRecord.NO_TIMESTAMP; private final Logger log; private final SourceNode source; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 6b835d97a3fb1..ee693739dc7aa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -75,8 +75,6 @@ public Map offsets() { } }; - private long streamTime = RecordQueue.UNKNOWN; - StandbyContextImpl(final TaskId id, final StreamsConfig config, final ProcessorStateManager stateMgr, @@ -231,14 +229,4 @@ public void setCurrentNode(final ProcessorNode currentNode) { public ProcessorNode currentNode() { throw new UnsupportedOperationException("this should not happen: currentNode not supported in standby tasks."); } - - void updateStreamTime(final long streamTime) { - this.streamTime = Math.max(this.streamTime, streamTime); - } - - @Override - public long streamTime() { - return streamTime; - } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 45f06b2bec563..bd8ba34cb151b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -37,7 +37,6 @@ public class StandbyTask extends AbstractTask { private Map checkpointedOffsets = new HashMap<>(); - private final StandbyContextImpl standbyContext; /** * Create {@link StandbyTask} with its assigned partitions @@ -60,7 +59,7 @@ public class StandbyTask extends AbstractTask { final StateDirectory stateDirectory) { super(id, partitions, topology, consumer, changelogReader, true, stateDirectory, config); - processorContext = standbyContext = new StandbyContextImpl(id, config, stateMgr, metrics); + processorContext = new StandbyContextImpl(id, config, stateMgr, metrics); } @Override @@ -120,7 +119,7 @@ public void suspend() { private void flushAndCheckpointState() { stateMgr.flush(); - stateMgr.checkpoint(Collections.emptyMap()); + stateMgr.checkpoint(Collections.emptyMap()); } /** @@ -177,9 +176,6 @@ public List> update(final TopicPartition partitio if (record.offset() < limit) { restoreRecords.add(record); lastOffset = record.offset(); - // ideally, we'd use the stream time at the time of the change logging, but we'll settle for - // record timestamp for now. - standbyContext.updateStreamTime(record.timestamp()); } else { remainingRecords.add(record); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 247a156eb3809..c4fecef4285ac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -236,7 +236,6 @@ public StreamTask(final TaskId id, recordInfo = new PartitionGroup.RecordInfo(); partitionGroup = new PartitionGroup(partitionQueues, recordLatenessSensor(processorContextImpl)); - processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp); stateMgr.registerGlobalStateStores(topology.globalStateStores()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java deleted file mode 100644 index a6a7a42674e29..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TimestampSupplier.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -public interface TimestampSupplier { - long get(); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java index 17079b964b505..c4fce72d78094 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStore.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.state.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; @@ -51,6 +52,7 @@ class RocksDBSegmentedBytesStore implements SegmentedBytesStore { private volatile boolean open; private Set bulkLoadSegments; private Sensor expiredRecordSensor; + private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; RocksDBSegmentedBytesStore(final String name, final String metricScope, @@ -108,7 +110,9 @@ public KeyValueIterator fetchAll(final long timeFrom, final long @Override public void remove(final Bytes key) { - final Segment segment = segments.getSegmentForTimestamp(keySchema.segmentTimestamp(key)); + final long timestamp = keySchema.segmentTimestamp(key); + observedStreamTime = Math.max(observedStreamTime, timestamp); + final Segment segment = segments.getSegmentForTimestamp(timestamp); if (segment == null) { return; } @@ -118,8 +122,9 @@ public void remove(final Bytes key) { @Override public void put(final Bytes key, final byte[] value) { final long timestamp = keySchema.segmentTimestamp(key); + observedStreamTime = Math.max(observedStreamTime, timestamp); final long segmentId = segments.segmentId(timestamp); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment == null) { expiredRecordSensor.record(); LOG.debug("Skipping record for expired segment."); @@ -163,7 +168,7 @@ public void init(final ProcessorContext context, final StateStore root) { "expired-window-record-drop" ); - segments.openExisting(this.context); + segments.openExisting(this.context, observedStreamTime); bulkLoadSegments = new HashSet<>(segments.allSegments()); @@ -215,10 +220,17 @@ void restoreAllInternal(final Collection> records) { // Visible for testing Map getWriteBatches(final Collection> records) { + // advance stream time to the max timestamp in the batch + for (final KeyValue record : records) { + final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key)); + observedStreamTime = Math.max(observedStreamTime, timestamp); + } + final Map writeBatchMap = new HashMap<>(); for (final KeyValue record : records) { - final long segmentId = segments.segmentId(keySchema.segmentTimestamp(Bytes.wrap(record.key))); - final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context); + final long timestamp = keySchema.segmentTimestamp(Bytes.wrap(record.key)); + final long segmentId = segments.segmentId(timestamp); + final Segment segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime); if (segment != null) { // This handles the case that state store is moved to a new client and does not // have the local RocksDB instance for the segment. In this case, toggleDBForBulkLoading @@ -247,7 +259,7 @@ Map getWriteBatches(final Collection value = ARBITRARY_CHANGE; processor.process(key, value); @@ -117,7 +116,6 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() { final long timestamp = ARBITRARY_LONG; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -137,14 +135,12 @@ public void intermediateSuppressionShouldBufferAndEmitLater() { final long timestamp = 0L; context.setRecordMetadata("topic", 0, 0, null, timestamp); - context.setStreamTime(timestamp); final String key = "hey"; final Change value = new Change<>(null, 1L); processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); context.setRecordMetadata("topic", 0, 1, null, 1L); - context.setStreamTime(1L); processor.process("tick", new Change<>(null, null)); assertThat(context.forwarded(), hasSize(1)); @@ -164,7 +160,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("topic", 0, 0, null, recordTime); - context.setStreamTime(recordTime); final Windowed key = new Windowed<>("hey", new TimeWindow(windowStart, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -176,7 +171,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime2 = 100L; final long windowEnd2 = 101L; context.setRecordMetadata("topic", 0, 1, null, recordTime2); - context.setStreamTime(recordTime2); processor.process(new Windowed<>("dummyKey1", new TimeWindow(windowStart2, windowEnd2)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(0)); @@ -185,7 +179,6 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { final long recordTime3 = 101L; final long windowEnd3 = 102L; context.setRecordMetadata("topic", 0, 1, null, recordTime3); - context.setStreamTime(recordTime3); processor.process(new Windowed<>("dummyKey2", new TimeWindow(windowStart3, windowEnd3)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); @@ -212,14 +205,12 @@ public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { final long streamTime = 99L; final long windowEnd = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(streamTime); final Windowed key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); context.setRecordMetadata("", 0, 1L, null, windowEnd); - context.setStreamTime(windowEnd); processor.process(new Windowed<>("dummyKey", new TimeWindow(windowEnd, windowEnd + 100L)), ARBITRARY_CHANGE); assertThat(context.forwarded(), hasSize(1)); @@ -237,7 +228,6 @@ public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = ARBITRARY_CHANGE; processor.process(key, value); @@ -257,7 +247,6 @@ public void finalResultsShouldSuppressTombstonesForTimeWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -274,7 +263,6 @@ public void finalResultsShouldSuppressTombstonesForSessionWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -291,7 +279,6 @@ public void suppressShouldNotSuppressTombstonesForTimeWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new TimeWindow(0L, 100L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -311,7 +298,6 @@ public void suppressShouldNotSuppressTombstonesForSessionWindows() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final Windowed key = new Windowed<>("hey", new SessionWindow(0L, 0L)); final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -331,7 +317,6 @@ public void suppressShouldNotSuppressTombstonesForKTable() { final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); - context.setStreamTime(timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); @@ -350,7 +335,6 @@ public void suppressShouldEmitWhenOverRecordCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); @@ -373,7 +357,6 @@ public void suppressShouldEmitWhenOverByteCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); final String key = "hey"; final Change value = new Change<>(null, ARBITRARY_LONG); @@ -396,7 +379,6 @@ public void suppressShouldShutDownWhenOverRecordCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); context.setCurrentNode(new ProcessorNode("testNode")); final String key = "hey"; @@ -420,7 +402,6 @@ public void suppressShouldShutDownWhenOverByteCapacity() { final KTableSuppressProcessor processor = harness.processor; final long timestamp = 100L; - context.setStreamTime(timestamp); context.setRecordMetadata("", 0, 0L, null, timestamp); context.setCurrentNode(new ProcessorNode("testNode")); final String key = "hey"; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 4ce9a9f4f177c..43df1d210239d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -224,10 +224,5 @@ public void forward(final K key, final V value, final String childName) { @Override public void commit() {} - - @Override - public long streamTime() { - throw new RuntimeException("not implemented"); - } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java index ee0069a4072c0..0ee71887f6c50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.java @@ -81,6 +81,7 @@ public class RocksDBSegmentedBytesStoreTest { private RocksDBSegmentedBytesStore bytesStore; private File stateDir; private final Window[] windows = new Window[4]; + private Window nextSegmentWindow; @Parameter public SegmentedBytesStore.KeySchema schema; @@ -98,12 +99,22 @@ public void before() { windows[1] = new SessionWindow(500L, 1000L); windows[2] = new SessionWindow(1_000L, 1_500L); windows[3] = new SessionWindow(30_000L, 60_000L); + // All four of the previous windows will go into segment 1. + // The nextSegmentWindow is computed be a high enough time that when it gets written + // to the segment store, it will advance stream time past the first segment's retention time and + // expire it. + nextSegmentWindow = new SessionWindow(segmentInterval + retention, segmentInterval + retention); } if (schema instanceof WindowKeySchema) { windows[0] = timeWindowForSize(10L, windowSizeForTimeWindow); windows[1] = timeWindowForSize(500L, windowSizeForTimeWindow); windows[2] = timeWindowForSize(1_000L, windowSizeForTimeWindow); windows[3] = timeWindowForSize(60_000L, windowSizeForTimeWindow); + // All four of the previous windows will go into segment 1. + // The nextSegmentWindow is computed be a high enough time that when it gets written + // to the segment store, it will advance stream time past the first segment's retention time and + // expire it. + nextSegmentWindow = timeWindowForSize(segmentInterval + retention, windowSizeForTimeWindow); } @@ -415,8 +426,13 @@ public void shouldLogAndMeasureExpiredRecords() { LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class); final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); - context.setStreamTime(Math.max(retention, segmentInterval) * 2); - bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5)); + // write a record to advance stream time, with a high enough timestamp + // that the subsequent record in windows[0] will already be expired. + bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); + + final Bytes key = serializeKey(new Windowed<>("a", windows[0])); + final byte[] value = serializeValue(5); + bytesStore.put(key, value); LogCaptureAppender.unregister(appender); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index c41b094230122..588f12f7b2a72 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -159,7 +159,6 @@ public void shouldOnlyIterateOpenSegments() { private void setCurrentTime(final long currentTime) { context.setRecordContext(createRecordContext(currentTime)); - context.setStreamTime(currentTime); } private ProcessorRecordContext createRecordContext(final long time) { @@ -727,9 +726,11 @@ public void testInitialLoading() { new File(storeDir, segments.segmentName(6L)).mkdir(); windowStore.close(); - context.setStreamTime(segmentInterval * 6L); windowStore = createWindowStore(context, false); + // put something in the store to advance its stream time and expire the old segments + windowStore.put(1, "v", 6L * segmentInterval); + final List expected = Utils.mkList(segments.segmentName(4L), segments.segmentName(5L), segments.segmentName(6L)); expected.sort(String::compareTo); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java index efed24f49e9fe..0c16457b229bd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentsTest.java @@ -94,9 +94,9 @@ public void shouldGetSegmentNameFromId() { @Test public void shouldCreateSegments() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context); + final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment segment3 = segments.getOrCreateSegmentIfLive(2, context, -1L); assertTrue(new File(context.stateDir(), "test/test.0").isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + SEGMENT_INTERVAL).isDirectory()); assertTrue(new File(context.stateDir(), "test/test." + 2 * SEGMENT_INTERVAL).isDirectory()); @@ -107,17 +107,16 @@ public void shouldCreateSegments() { @Test public void shouldNotCreateSegmentThatIsAlreadyExpired() { - updateStreamTimeAndCreateSegment(7); - assertNull(segments.getOrCreateSegmentIfLive(0, context)); + final long streamTime = updateStreamTimeAndCreateSegment(7); + assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime)); assertFalse(new File(context.stateDir(), "test/test.0").exists()); } @Test public void shouldCleanupSegmentsThatHaveExpired() { - final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context); - final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context); - context.setStreamTime(SEGMENT_INTERVAL * 7); - final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context); + final Segment segment1 = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment segment2 = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment segment3 = segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L); assertFalse(segment1.isOpen()); assertFalse(segment2.isOpen()); assertTrue(segment3.isOpen()); @@ -128,22 +127,22 @@ public void shouldCleanupSegmentsThatHaveExpired() { @Test public void shouldGetSegmentForTimestamp() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); + final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); + segments.getOrCreateSegmentIfLive(1, context, -1L); assertEquals(segment, segments.getSegmentForTimestamp(0L)); } @Test public void shouldGetCorrectSegmentString() { - final Segment segment = segments.getOrCreateSegmentIfLive(0, context); + final Segment segment = segments.getOrCreateSegmentIfLive(0, context, -1L); assertEquals("Segment(id=0, name=test.0)", segment.toString()); } @Test public void shouldCloseAllOpenSegments() { - final Segment first = segments.getOrCreateSegmentIfLive(0, context); - final Segment second = segments.getOrCreateSegmentIfLive(1, context); - final Segment third = segments.getOrCreateSegmentIfLive(2, context); + final Segment first = segments.getOrCreateSegmentIfLive(0, context, -1L); + final Segment second = segments.getOrCreateSegmentIfLive(1, context, -1L); + final Segment third = segments.getOrCreateSegmentIfLive(2, context, -1L); segments.close(); assertFalse(first.isOpen()); @@ -154,16 +153,16 @@ public void shouldCloseAllOpenSegments() { @Test public void shouldOpenExistingSegments() { segments = new Segments("test", 4, 1); - segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); - segments.getOrCreateSegmentIfLive(2, context); - segments.getOrCreateSegmentIfLive(3, context); - segments.getOrCreateSegmentIfLive(4, context); + segments.getOrCreateSegmentIfLive(0, context, -1L); + segments.getOrCreateSegmentIfLive(1, context, -1L); + segments.getOrCreateSegmentIfLive(2, context, -1L); + segments.getOrCreateSegmentIfLive(3, context, -1L); + segments.getOrCreateSegmentIfLive(4, context, -1L); // close existing. segments.close(); segments = new Segments("test", 4, 1); - segments.openExisting(context); + segments.openExisting(context, -1L); assertTrue(segments.getSegmentForTimestamp(0).isOpen()); assertTrue(segments.getSegmentForTimestamp(1).isOpen()); @@ -178,12 +177,12 @@ public void shouldGetSegmentsWithinTimeRange() { updateStreamTimeAndCreateSegment(1); updateStreamTimeAndCreateSegment(2); updateStreamTimeAndCreateSegment(3); - updateStreamTimeAndCreateSegment(4); - segments.getOrCreateSegmentIfLive(0, context); - segments.getOrCreateSegmentIfLive(1, context); - segments.getOrCreateSegmentIfLive(2, context); - segments.getOrCreateSegmentIfLive(3, context); - segments.getOrCreateSegmentIfLive(4, context); + final long streamTime = updateStreamTimeAndCreateSegment(4); + segments.getOrCreateSegmentIfLive(0, context, streamTime); + segments.getOrCreateSegmentIfLive(1, context, streamTime); + segments.getOrCreateSegmentIfLive(2, context, streamTime); + segments.getOrCreateSegmentIfLive(3, context, streamTime); + segments.getOrCreateSegmentIfLive(4, context, streamTime); final List segments = this.segments.segments(0, 2 * SEGMENT_INTERVAL); assertEquals(3, segments.size()); @@ -235,17 +234,18 @@ public void futureEventsShouldNotCauseSegmentRoll() { verifyCorrectSegments(0, 3); updateStreamTimeAndCreateSegment(3); verifyCorrectSegments(0, 4); - updateStreamTimeAndCreateSegment(4); + final long streamTime = updateStreamTimeAndCreateSegment(4); verifyCorrectSegments(0, 5); - segments.getOrCreateSegmentIfLive(5, context); + segments.getOrCreateSegmentIfLive(5, context, streamTime); verifyCorrectSegments(0, 6); - segments.getOrCreateSegmentIfLive(6, context); + segments.getOrCreateSegmentIfLive(6, context, streamTime); verifyCorrectSegments(0, 7); } - private void updateStreamTimeAndCreateSegment(final int segment) { - context.setStreamTime(SEGMENT_INTERVAL * segment); - segments.getOrCreateSegmentIfLive(segment, context); + private long updateStreamTimeAndCreateSegment(final int segment) { + final long streamTime = SEGMENT_INTERVAL * segment; + segments.getOrCreateSegmentIfLive(segment, context, streamTime); + return streamTime; } @Test @@ -268,7 +268,7 @@ public void shouldUpdateSegmentFileNameFromOldDateFormatToNewFormat() throws Exc oldSegment.createNewFile(); } - segments.openExisting(context); + segments.openExisting(context, -1L); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final String segmentName = storeName + "." + (long) segmentId * segmentInterval; @@ -290,7 +290,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex oldSegment.createNewFile(); } - segments.openExisting(context); + segments.openExisting(context, -1L); for (int segmentId = 0; segmentId < NUM_SEGMENTS; ++segmentId) { final File newSegment = new File(storeDirectoryPath + File.separator + storeName + "." + segmentId * (RETENTION_PERIOD / (NUM_SEGMENTS - 1))); @@ -300,7 +300,7 @@ public void shouldUpdateSegmentFileNameFromOldColonFormatToNewFormat() throws Ex @Test public void shouldClearSegmentsOnClose() { - segments.getOrCreateSegmentIfLive(0, context); + segments.getOrCreateSegmentIfLive(0, context, -1L); segments.close(); assertThat(segments.getSegmentForTimestamp(0), is(nullValue())); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index ddff7a892e2e1..d7f9b6c2733c6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -16,30 +16,31 @@ */ package org.apache.kafka.streams.tests; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; import java.util.Properties; public class SmokeTestClient extends SmokeTestUtil { @@ -115,85 +116,76 @@ private static KafkaStreams createKafkaStreams(final Properties props) { final StreamsBuilder builder = new StreamsBuilder(); final Consumed stringIntConsumed = Consumed.with(stringSerde, intSerde); final KStream source = builder.stream("data", stringIntConsumed); - source.to("echo", Produced.with(stringSerde, intSerde)); - final KStream data = source.filter(new Predicate() { - @Override - public boolean test(final String key, final Integer value) { - return value == null || value != END; - } - }); + source.filterNot((k, v) -> k.equals("flush")) + .to("echo", Produced.with(stringSerde, intSerde)); + final KStream data = source.filter((key, value) -> value == null || value != END); data.process(SmokeTestUtil.printProcessorSupplier("data")); // min final KGroupedStream groupedData = data.groupByKey(Serialized.with(stringSerde, intSerde)); - groupedData - .windowedBy(TimeWindows.of(Duration.ofDays(1))) + final KTable, Integer> minAggregation = groupedData + .windowedBy(TimeWindows.of(Duration.ofDays(1)).grace(Duration.ofMinutes(1))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MAX_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value < aggregate) ? value : aggregate; - } - }, - Materialized.>as("uwin-min").withValueSerde(intSerde)) - .toStream(new Unwindow()) + () -> Integer.MAX_VALUE, + (aggKey, value, aggregate) -> (value < aggregate) ? value : aggregate, + Materialized + .>as("uwin-min") + .withValueSerde(intSerde) + .withRetention(Duration.ofHours(25)) + ); + + minAggregation + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("min-raw", Produced.with(stringSerde, intSerde)); + + minAggregation + .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())) + .toStream() + .filterNot((k, v) -> k.key().equals("flush")) + .map((key, value) -> new KeyValue<>(key.toString(), value)) + .to("min-suppressed", Produced.with(stringSerde, intSerde)); + + minAggregation + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("min", Produced.with(stringSerde, intSerde)); final KTable minTable = builder.table( "min", Consumed.with(stringSerde, intSerde), - Materialized.>as("minStoreName")); + Materialized.as("minStoreName")); minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min")); // max groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Integer apply() { - return Integer.MIN_VALUE; - } - }, - new Aggregator() { - @Override - public Integer apply(final String aggKey, final Integer value, final Integer aggregate) { - return (value > aggregate) ? value : aggregate; - } - }, + () -> Integer.MIN_VALUE, + (aggKey, value, aggregate) -> (value > aggregate) ? value : aggregate, Materialized.>as("uwin-max").withValueSerde(intSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("max", Produced.with(stringSerde, intSerde)); final KTable maxTable = builder.table( "max", Consumed.with(stringSerde, intSerde), - Materialized.>as("maxStoreName")); + Materialized.as("maxStoreName")); maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max")); // sum groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) .aggregate( - new Initializer() { - public Long apply() { - return 0L; - } - }, - new Aggregator() { - @Override - public Long apply(final String aggKey, final Integer value, final Long aggregate) { - return (long) value + aggregate; - } - }, + () -> 0L, + (aggKey, value, aggregate) -> (long) value + aggregate, Materialized.>as("win-sum").withValueSerde(longSerde)) - .toStream(new Unwindow()) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("sum", Produced.with(stringSerde, longSerde)); final Consumed stringLongConsumed = Consumed.with(stringSerde, longSerde); @@ -203,38 +195,33 @@ public Long apply(final String aggKey, final Integer value, final Long aggregate // cnt groupedData .windowedBy(TimeWindows.of(Duration.ofDays(2))) - .count(Materialized.>as("uwin-cnt")) - .toStream(new Unwindow()) + .count(Materialized.as("uwin-cnt")) + .toStream(new Unwindow<>()) + .filterNot((k, v) -> k.equals("flush")) .to("cnt", Produced.with(stringSerde, longSerde)); final KTable cntTable = builder.table( "cnt", Consumed.with(stringSerde, longSerde), - Materialized.>as("cntStoreName")); + Materialized.as("cntStoreName")); cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt")); // dif maxTable .join( minTable, - new ValueJoiner() { - public Integer apply(final Integer value1, final Integer value2) { - return value1 - value2; - } - }) + (value1, value2) -> value1 - value2) .toStream() + .filterNot((k, v) -> k.equals("flush")) .to("dif", Produced.with(stringSerde, intSerde)); // avg sumTable .join( cntTable, - new ValueJoiner() { - public Double apply(final Long value1, final Long value2) { - return (double) value1 / (double) value2; - } - }) + (value1, value2) -> (double) value1 / (double) value2) .toStream() + .filterNot((k, v) -> k.equals("flush")) .to("avg", Produced.with(stringSerde, doubleSerde)); // test repartition diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java index 7087298d86a57..aef7fa7856952 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java @@ -41,6 +41,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -214,6 +215,19 @@ public static Map> generate(final String kafka, } } + // now that we've sent everything, we'll send some final records with a timestamp high enough to flush out + // all suppressed records. + final List partitions = producer.partitionsFor("data"); + for (final PartitionInfo partition : partitions) { + producer.send(new ProducerRecord<>( + partition.topic(), + partition.partition(), + System.currentTimeMillis() + Duration.ofDays(2).toMillis(), + stringSerde.serializer().serialize("", "flush"), + intSerde.serializer().serialize("", 0) + )); + } + producer.close(); return Collections.unmodifiableMap(allData); } @@ -262,7 +276,7 @@ public static void verify(final String kafka, final Map> al props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); final KafkaConsumer consumer = new KafkaConsumer<>(props); - final List partitions = getAllPartitions(consumer, "echo", "max", "min", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); + final List partitions = getAllPartitions(consumer, "echo", "max", "min", "min-suppressed", "dif", "sum", "cnt", "avg", "wcnt", "tagg"); consumer.assign(partitions); consumer.seekToBeginning(partitions); @@ -271,6 +285,7 @@ public static void verify(final String kafka, final Map> al final HashMap max = new HashMap<>(); final HashMap min = new HashMap<>(); + final HashMap> minSuppressed = new HashMap<>(); final HashMap dif = new HashMap<>(); final HashMap sum = new HashMap<>(); final HashMap cnt = new HashMap<>(); @@ -290,6 +305,7 @@ public static void verify(final String kafka, final Map> al final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); if (records.isEmpty() && recordsProcessed >= recordsGenerated) { if (verifyMin(min, allData, false) + && verifyMinSuppressed(minSuppressed, allData, false) && verifyMax(max, allData, false) && verifyDif(dif, allData, false) && verifySum(sum, allData, false) @@ -316,6 +332,10 @@ && verifyTAgg(tagg, allData, false)) { case "min": min.put(key, intSerde.deserializer().deserialize("", record.value())); break; + case "min-suppressed": + minSuppressed.computeIfAbsent(key, k -> new LinkedList<>()) + .add(intSerde.deserializer().deserialize("", record.value())); + break; case "max": max.put(key, intSerde.deserializer().deserialize("", record.value())); break; @@ -372,6 +392,7 @@ && verifyTAgg(tagg, allData, false)) { } success &= verifyMin(min, allData, true); + success &= verifyMinSuppressed(minSuppressed, allData, true); success &= verifyMax(max, allData, true); success &= verifyDif(dif, allData, true); success &= verifySum(sum, allData, true); @@ -412,6 +433,41 @@ private static boolean verifyMin(final Map map, final Map> map, + final Map> allData, + final boolean print) { + if (map.isEmpty()) { + maybePrint(print, "min-suppressed is empty"); + return false; + } else { + maybePrint(print, "verifying min-suppressed"); + + if (map.size() != allData.size()) { + maybePrint(print, "fail: resultCount=" + map.size() + " expectedCount=" + allData.size()); + return false; + } + for (final Map.Entry> entry : map.entrySet()) { + final String key = entry.getKey(); + final String unwindowedKey = key.substring(1, key.length() - 1).replaceAll("@.*", ""); + final int expected = getMin(unwindowedKey); + if (entry.getValue().size() != 1) { + maybePrint(print, "fail: key=" + entry.getKey() + " non-unique value: " + entry.getValue()); + return false; + } else if (expected != entry.getValue().get(0)) { + maybePrint(print, "fail: key=" + entry.getKey() + " min=" + entry.getValue().get(0) + " expected=" + expected); + return false; + } + } + } + return true; + } + + private static void maybePrint(final boolean print, final String s) { + if (print) { + System.out.println(s); + } + } + private static boolean verifyMax(final Map map, final Map> allData, final boolean print) { if (map.isEmpty()) { if (print) { diff --git a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java index 2f356bff3a133..f483adfa79d1c 100644 --- a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.test; -import java.time.Duration; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; @@ -33,10 +32,10 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.AbstractProcessorContext; -import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.CompositeRestoreListener; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.streams.processor.internals.ToInternal; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; @@ -44,6 +43,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -64,7 +64,6 @@ public class InternalMockProcessorContext extends AbstractProcessorContext imple private Serde keySerde; private Serde valSerde; private long timestamp = -1L; - private long streamTime = -1L; public InternalMockProcessorContext() { this(null, @@ -175,15 +174,6 @@ public Serde valueSerde() { @Override public void initialize() {} - public void setStreamTime(final long currentTime) { - streamTime = currentTime; - } - - @Override - public long streamTime() { - return streamTime; - } - @Override public File stateDir() { if (stateDir == null) { diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java index 14f8561030fca..62a8491626594 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -25,7 +25,6 @@ public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext { private ProcessorNode currentNode; - private long streamTime; @Override public StreamsMetricsImpl metrics() { @@ -72,13 +71,4 @@ public void initialize() { public void uninitialize() { } - - @Override - public long streamTime() { - return streamTime; - } - - public void setStreamTime(final long streamTime) { - this.streamTime = streamTime; - } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java index 36d049c58bad1..650340881606f 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java @@ -94,11 +94,6 @@ public void initialize() { initialized = true; } - @Override - public long streamTime() { - throw new RuntimeException("streamTime is not implemented for NoOpProcessorContext"); - } - @Override public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { From 156dee77f3dbd589fd27cf2cd240867fb3231431 Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 4 Mar 2019 14:20:21 -0600 Subject: [PATCH 2/3] fix style --- .../kstream/internals/suppress/KTableSuppressProcessor.java | 1 - .../apache/kafka/streams/state/internals/AbstractSegments.java | 0 2 files changed, 1 deletion(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 8cf5025c6a6b6..4058083dc1bdc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals.suppress; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 From dcacd02de53992cd6f1100f3461c4cdd82a2962c Mon Sep 17 00:00:00 2001 From: John Roesler Date: Mon, 4 Mar 2019 18:25:19 -0600 Subject: [PATCH 3/3] fix cherry-pick problems --- .../internals/suppress/KTableSuppressProcessorMetricsTest.java | 0 .../processor/internals/GlobalProcessorContextImplTest.java | 0 .../java/org/apache/kafka/streams/tests/SmokeTestClient.java | 3 --- 3 files changed, 3 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java delete mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorMetricsTest.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index d7f9b6c2733c6..d70c7b90be7ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -24,10 +24,7 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.Aggregator; -import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable;