MINOR: enforce non-negative invariant for checkpointed offsets#8297
Conversation
| if (offset >= 0L) { | ||
| offsets.put(tp, offset); | ||
| } else { | ||
| LOG.warn("Read offset={} from checkpoint file for {}", offset, tp); |
There was a problem hiding this comment.
Since we may have older checkpoint files that wrote negative offsets, we should just ignore these on read
| if (metadata.offset() >= 0L) { | ||
| offsets.put(tp, metadata.offset()); | ||
| } else { | ||
| log.warn("Received offset={} in produce response for {}", metadata.offset(), tp); |
There was a problem hiding this comment.
Skip writing/overwriting the offset for this partition, at worst we will have to restore the last batch of records unnecessarily.
There was a problem hiding this comment.
Did you observe that offset returned from the param could be smaller than zero? I think this should never happen from producer callbacks.
There was a problem hiding this comment.
No, but Jason said he thought it may be possible in some edge (but non-error) cases. There's no guarantee a successful send will never return a negative offset
|
call for review @vvcephei @cadonna @guozhangwang |
guozhangwang
left a comment
There was a problem hiding this comment.
Overall lgtm, just curious if you have observed this happening in KIP-441?
| if (metadata.offset() >= 0L) { | ||
| offsets.put(tp, metadata.offset()); | ||
| } else { | ||
| log.warn("Received offset={} in produce response for {}", metadata.offset(), tp); |
There was a problem hiding this comment.
Did you observe that offset returned from the param could be smaller than zero? I think this should never happen from producer callbacks.
|
test this please |
|
Merged to trunk. |
* apache-github/trunk: (39 commits) MINOR: cleanup and add tests to StateDirectoryTest (apache#8304) HOTFIX: StateDirectoryTest should use Set instead of List (apache#8305) MINOR: Fix build and JavaDoc warnings (apache#8291) MINOR: Fix kafka.server.RequestQuotaTest missing new ApiKeys. (apache#8302) KAFKA-9712: Catch and handle exception thrown by reflections scanner (apache#8289) KAFKA-9670; Reduce allocations in Metadata Response preparation (apache#8236) MINOR: fix Scala 2.13 build error introduced in apache#8083 (apache#8301) MINOR: enforce non-negative invariant for checkpointed offsets (apache#8297) MINOR: comment apikey types in generated switch (apache#8201) MINOR: Fix typo in CreateTopicsResponse.json (apache#8300) KIP-546: Implement describeClientQuotas and alterClientQuotas. (apache#8083) KAFKA-6647: Do note delete the lock file while holding the lock (apache#8267) KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (apache#8290) KAFKA-9533: Fix JavaDocs of KStream.transformValues (apache#8298) MINOR: reuse pseudo-topic in FKJoin (apache#8296) KAFKA-6145: Pt 2. Include offset sums in subscription (apache#8246) KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (apache#8293) KAFKA-9718; Don't log passwords for AlterConfigs in request logs (apache#8294) KAFKA-8768: DeleteRecords request/response automated protocol (apache#7957) KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer ...
While discussing KIP-441 we realize we don't strictly enforce that all checkpointed offset sums are positive (or 0, though there's not much point to checkingpoint a 0 offset is there)?
Rather than awkwardly try handle this within every user/reader of the checkpoint file, we should just make a guarantee that all returned checkpointed offsets are positive.