KAFKA-10306: GlobalThread should fail on InvalidOffsetException#9075
KAFKA-10306: GlobalThread should fail on InvalidOffsetException#9075mjsax merged 5 commits intoapache:trunkfrom
Conversation
| stateRestoreAdapter.restoreBatch(restoreRecords); | ||
| stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size()); | ||
| restoreCount += restoreRecords.size(); | ||
| } catch (final InvalidOffsetException recoverableException) { |
There was a problem hiding this comment.
This is the actual bug: we swallow the exception. However, because we don't do any "seek", we just hit the same exception in poll() over and over and never recover but loop forever.
| void flushState(); | ||
|
|
||
| void close() throws IOException; | ||
| void close(final boolean wipeStateStore) throws IOException; |
There was a problem hiding this comment.
We could also not wipe out the store and let the user do it manually. A manual cleanup is actually required nowadays, thus, it's actually a small side "improvement".
Note that we wipe out the whole global task dir here -- in contrast, users could do a manual per-store wipe out... But as we do the same coarse grained wipe out for all tasks and we have already a ticket for "per store cleanup" I though it would be ok for now.
Let me know what you think.
| stateMaintainer.flushState(); | ||
| lastFlush = now; | ||
| } | ||
| } catch (final InvalidOffsetException recoverableException) { |
There was a problem hiding this comment.
We just let the original exception bubble up, to be able to wipe out the store. -- This is also just a side "improvement"; we could also just die and let users cleanup the state directory manually. However, it seems better to wipe it out directly.
| } | ||
|
|
||
| @Test | ||
| public void shouldRecoverFromInvalidOffsetExceptionAndRestoreRecords() { |
There was a problem hiding this comment.
This test was broken: it throw the InvalidOffsetException only once and thus the second poll() succeeds -- however, this is not how a real consumer behalves.
| assertFalse(globalStreamThread.stillRunning()); | ||
| } | ||
|
|
||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
Just a little side cleanup :)
| void flushState(); | ||
|
|
||
| void close() throws IOException; | ||
| void close(final boolean wipeStateStore) throws IOException; |
|
Otherwise, it LGTM. Feel free to merge whether or not you like my suggestions ;) |
…nals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
…nals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
…nals/GlobalStreamThread.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
…nals/GlobalStreamThread.java
|
Retest this please. |
1 similar comment
|
Retest this please. |
* KAFKA-10306: GlobalThread should fail on InvalidOffsetException * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com> * Update streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java Co-authored-by: John Roesler <vvcephei@users.noreply.github.com>
|
Merged to |
…t-for-generated-requests * apache-github/trunk: (148 commits) MINOR: remove NewTopic#NO_PARTITIONS and NewTopic#NO_REPLICATION_FACTOR as they are duplicate to CreateTopicsRequest#NO_NUM_PARTITIONS and CreateTopicsRequest#NO_REPLICATION_FACTOR (apache#9077) MINOR: Remove staticmethod tag to be able to use logger of instance (apache#9086) MINOR: Adjust 'release.py' script to use shell when using gradlewAll and PGP signing, which were required to build the 2.6.0 RCs (apache#9045) MINOR: Update dependencies for Kafka 2.7 (part 1) (apache#9082) MINOR: INFO log4j when request re-join (apache#9068) MINOR: Recommend Java 11 (apache#9080) KAFKA-10306: GlobalThread should fail on InvalidOffsetException (apache#9075) KAFKA-10158: Fix flaky testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress (apache#9022) MINOR: code cleanup for `VOut` inconsistent naming (apache#8907) KAFKA-10246 : AbstractProcessorContext topic() throws NPE (apache#9034) KAFKA-10305: Print usage when parsing fails for ConsumerPerformance (apache#9071) MINOR: removed incorrect deprecation annotations (apache#9061) MINOR: speed up release script (apache#9070) MINOR: add task ':streams:testAll' (apache#9073) KAFKA-10301: Do not clear Partition#remoteReplicasMap during partition assignment updates (apache#9065) KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work (apache#9051) KAFKA-10300 fix flaky core/group_mode_transactions_test.py (apache#9059) MINOR: Publish metrics package in the javadoc (apache#9036) KAFKA-8264: decrease the record size for flaky test KAFKA-5876: Add new exception types for Interactive Queries (apache#8200) ...
Fix for
2.6blocker bug.Call for review @guozhangwang @vvcephei (\cc @rhauch)