KAFKA-7895: Fix stream-time reckoning for suppress#6278
KAFKA-7895: Fix stream-time reckoning for suppress#6278mjsax merged 4 commits intoapache:trunkfrom vvcephei:KAFKA-7895-fix-suppress
Conversation
* Add suppress to system tests * Move stream-time reckoning from Task into Processor Even within a Task, different Processors have different perceptions of time, due to record caching on stores and in suppression itself, and in general, due to any processor logic that may hold onto records arbitrarily and emit them later. Thanks to this, we can't rely on the whole task existing in the same "instant" of stream-time. The solution is for each processor node that cares about stream-time to track it independently.
|
@ableegoldman @mjsax @bbejeck @guozhangwang , Please take a look at this fix for suppression. As I mentioned in the description, it was incorrect for a processor to use the task's view of stream time, when a processor within a task may have an arbitrarily delayed local view of time, when it has an upstream record cache or suppression. This was actually always true (since I introduced stream time into the context), but it didn't manifest as a bug until suppression, which depends very strictly on the stream time. The correct thing to do is track observed stream time locally in each processor node. Since not every processor needs stream time, and since it was never in the ProcessorContext public API, I opted to remove it from the ProcessorContext and just explicitly implement it in the nodes that need it. If we wish to expose "stream time" to processors in general, we can instead track stream time in ProcessorNode and then wrap the ProcessorContext to supply the local stream time when injecting the context to Ideally, we can merge this fix into 2.2 as well. |
|
I guess it should be noted that the same condition could cause segments to be prematurely expired and records to be prematurely rejected from windowed aggregations due to grace period restrictions, if the windowed aggregation were downstream of a cache or suppression. It just happens that suppression was the first thing that people noticed (probably due to the long default retentions and grace period). |
|
Java 11 failure was a kafka admin test. Apparently, gradle crashed in the Java 8 build! Retest this, please. |
|
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2402/ Java8 timed out (cf https://issues.apache.org/jira/browse/KAFKA-7933 |
| private StreamsMetricsImpl metrics; | ||
| private InternalProcessorContext internalProcessorContext; | ||
| private Sensor lateRecordDropSensor; | ||
| private long observedStreamTime = -1L; |
There was a problem hiding this comment.
nit: use ConsumerRecord.NO_TIMESTAMP in stead of -1L (same below)
There was a problem hiding this comment.
Hmm, should we use Long.MIN_VALUE instead, to account for negative timestamps?
We don't need a sentinel value, we just need something that's not larger than a real record's timestamp.
There was a problem hiding this comment.
So far, we don't support negative timestamp, so it seems to be cleanest to use NO_TIMESTAMP for now -- if we ever introduce negative timestamps, we need to update the code anyway.
There was a problem hiding this comment.
Sounds good. It's updated.
| } | ||
| }; | ||
|
|
||
| private long streamTime = RecordQueue.UNKNOWN; |
There was a problem hiding this comment.
Can you fix RecordQueue.UNKNOWN as a side thing, too?
There was a problem hiding this comment.
You mean, set it to ConsumerRecord.NO_TIMESTAMP, right?
| @Override | ||
| public S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context) { | ||
| final long minLiveTimestamp = context.streamTime() - retentionPeriod; | ||
| public S getOrCreateSegmentIfLive(final long segmentId, final InternalProcessorContext context, final long streamTime) { |
There was a problem hiding this comment.
nit: break parameters into single lines
| segment.write(batch); | ||
| if (segment.open) { | ||
| segment.write(batch); | ||
| } |
There was a problem hiding this comment.
Can you elaborate? Seems to be orthogonal to the timestamp fix.
There was a problem hiding this comment.
The segmented store expires segments based on stream time.
Previously, the stream time it used was coming from the ProcessorContext, which doesn't get updated while we're processing this batch write, so the whole method execution happened in one "instant". That instant would be at least the latest time in the batch. So when it loaded all the segments in building batchWriteMap, it was already able to tell which records were for expired segments and drop them.
Now, though, it tracks stream time based on the sequence of received writes, so at the beginning of this method call, the local stream time is maybe at the beginning of the batch. This means that in the beginning of this method, it creates/gets segments for everything in the batch (not updating the stream time at all), so all the segments are still live until we enter the loop over writeBatchMap.
When we loop over the batch map, we're actually processing the writes, which advances the local view of stream time, possibly expiring some segments. Since stream-time is actually advancing during the loop, and segments are being expired as the loop executes, we can no longer expect all our segments to be open here.
There was a problem hiding this comment.
Thanks for the details. Should we instead do a "clean up" within getWriteBatches() and only return open segments? Seems to be a better separation of concerns.
There was a problem hiding this comment.
Hah. Ok, that's what I get for commenting from memory. It turns out that we actually do advance time within getWriteBatches. The problem is actually that in that method, we're advancing time while we're also creating the segments and populating the result list. So, by the end of the iteration, some of the segments at the beginning of the result list may already be expired.
We can fix it by either pre-iterating over all the records to advance time to the largest timestamp in the batch or post-iterating over the results to remove the expired segments.
I opted for the pre-iteration so that we can potentially avoid creating and opening a segment that would just need to be closed and cleaned up later in the same batch.
The code is cleaned up now. Thanks for the catch!
|
|
||
| context.setStreamTime(Math.max(retention, segmentInterval) * 2); | ||
| bytesStore.put(serializeKey(new Windowed<>("a", windows[0])), serializeValue(5)); | ||
| bytesStore.put(serializeKey(new Windowed<>("dummy", nextSegmentWindow)), serializeValue(0)); |
There was a problem hiding this comment.
Is this to "set" a specific obserservedTime -- might need a comment
There was a problem hiding this comment.
yes, it's to set the stream time to a high enough value that it expires the previous segment... I'll add a comment.
| LogCaptureAppender.setClassLoggerToDebug(RocksDBSegmentedBytesStore.class); | ||
| final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(); | ||
|
|
||
| context.setStreamTime(Math.max(retention, segmentInterval) * 2); |
There was a problem hiding this comment.
I can't map this to the nextSegmentWindow bounds above. Can you elaborate?
There was a problem hiding this comment.
The new code is a tighter bound. Previously we used 2 * max(retention, interval), but all we really need is retention + interval. Not sure why I did the previous code, it's guaranteed to be at least retention + interval, but it's pretty esoteric...
|
Thanks for the review, @mjsax . I think I've addressed your comments. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the patch @vvcephei. I've made one pass and I have one question regarding the test. The smoke test covers the scenario of a time window for a 24 hour period with a grace of one minute, then sends records way beyond the time bounds to force a flush.
That makes sense, but can we also test with tighter time bounds with windows of say 30 seconds and use system time without adding any extra time for verification?
|
I think that's a good idea, @bbejeck . I'm thinking this could be an integration test, to avoid adding too much on to the smoke test, do you agree? Actually, after wrestling with this for a bit, it's more complicated than I thought. What do you think about tackling that in a separate PR? I went ahead and added it to the acceptance criteria of KAFKA-7895. There's also a task there to add system test coverage with caching disabled, so this PR wouldn't have resolved the ticket anyway. |
|
Failures unrelated: https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2414/ Retest this, please, |
|
Java 11 had one unrelated failure: |
|
That plaintext consumer test was the only failure. I'm not sure if there's value in repeating the tests at this point. |
|
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2424/ Retest this please. |
|
Hey, it passed that time! Thanks, @mjsax . |
Yes, I actually was thinking of an integration test and just wasn't being clear about it.
That's fine. |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks for the patch @vvcephei. Over all looks good I just wanted to ask a clarifying question.
Would it be fair to say the new approach "smooths" out the representation of time per processor, in that it can only ever increase or at a minimum stay the same?
The process of only updating the observedStreamTime when the internalProcessorContext#timestamp() is greater than the last observed time is more or less the same approach taken in the RecordQueue#maybeUpdateTimestamp() correct?
|
Looks good, thanks @vvcephei |
|
Thanks, @bbejeck , Yes, for the processors that track stream time, the definition of time is essentially the same as it is for the record queue. It's always the maximum record timestamp ever observed, so, as you say, it will always increase or stay the same. It used to be true that (when there's one or more upstream caches or suppressions in the same task), we'd always see records arriving from the past. E.g., the clock would say 10:00, but I'd be getting records from 09:55. Now, we've made stream time tighter, so the clock will say 09:55 when we're getting records for 09:55. Another change is that, for the processors tracking stream time, time only advances for records that actually flow through the processor, so we won't see the clock continuing to "tick" as a result of records that actually get filtered out before they get to us. I think this is good for predictability. That is, it's a simpler mental model, so it's easier to predict how the windowed aggs and suppression will behave wrt stream time. I think it makes sense to say that this smoothes out the representation of stream time. |
|
Thanks @mbragg02! |
|
@vvcephei This PR has conflicts with |
|
Merged to |
|
Thanks, @mjsax ! I will do so now. |
* ak/trunk: (45 commits) KAFKA-7487: DumpLogSegments misreports offset mismatches (apache#5756) MINOR: improve JavaDocs about auto-repartitioning in Streams DSL (apache#6269) KAFKA-7935: UNSUPPORTED_COMPRESSION_TYPE if ReplicaManager.getLogConfig returns None (apache#6274) KAFKA-7895: Fix stream-time reckoning for suppress (apache#6278) KAFKA-6569: Move OffsetIndex/TimeIndex logger to companion object (apache#4586) MINOR: add log indicating the suppression time (apache#6260) MINOR: Make info logs for KafkaConsumer a bit more verbose (apache#6279) KAFKA-7758: Reuse KGroupedStream/KGroupedTable with named repartition topics (apache#6265) KAFKA-7884; Docs for message.format.version should display valid values (apache#6209) MINOR: Save failed test output to build output directory MINOR: add test for StreamsSmokeTestDriver (apache#6231) MINOR: Fix bugs identified by compiler warnings (apache#6258) KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (apache#5433) MINOR: fix bypasses in ChangeLogging stores (apache#6266) MINOR: Make MockClient#poll() more thread-safe (apache#5942) MINOR: drop dbAccessor reference on close (apache#6254) KAFKA-7811: Avoid unnecessary lock acquire when KafkaConsumer commits offsets (apache#6119) KAFKA-7916: Unify store wrapping code for clarity (apache#6255) MINOR: Add missing Alter Operation to Topic supported operations list in AclCommand KAFKA-7921: log at error level for missing source topic (apache#6262) ...
Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
Even within a Task, different Processors have different perceptions
of time, due to record caching on stores and in suppression itself,
and in general, due to any processor logic that may hold onto
records arbitrarily and emit them later. Thanks to this, we can't rely
on the whole task existing in the same "instant" of stream-time. The
solution is for each processor node that cares about stream-time to
track it independently.
Committer Checklist (excluded from commit message)