Skip to content

KAFKA-9838; Add log concurrency test and fix minor race condition#8476

Merged
hachikuji merged 2 commits intoapache:trunkfrom
hachikuji:KAFKA-9838
Apr 16, 2020
Merged

KAFKA-9838; Add log concurrency test and fix minor race condition#8476
hachikuji merged 2 commits intoapache:trunkfrom
hachikuji:KAFKA-9838

Conversation

@hachikuji
Copy link
Copy Markdown
Contributor

The patch adds a new test case for validating concurrent read/write behavior in the Log implementation. In the process of verifying this, we found a race condition in read. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, LogSegment.read fails because it is unable to find the starting position to read from.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @hachikuji , just a few minor questions. Nice catch!


if (startOffset > maxOffsetMetadata.messageOffset) {
if (startOffset == maxOffsetMetadata.messageOffset) {
return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why in this case we use maxOffsetMetadata and in the else if we use startOffsetMetadata as emptyFetchDataInfo#fetchOffsetMetadata? Is there a specific rationale for that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the start offset matches max offset, then we do not need to lookup the metadata as we already have it. Really this is the bug fix. Without this, then we will proceed to read from the log segment, which results in an error like the following:

java.util.concurrent.ExecutionException: org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read `log header` from file channel `sun.nio.ch.FileChannelImpl@cc86429`. Expected to read 17 bytes, but reached end of file after reading 0 bytes. Started read from position 85.

	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumed(LogConcurrencyTest.scala:75)
	at kafka.log.LogConcurrencyTest.testUncommittedDataNotConsumedFrequentSegmentRolls(LogConcurrencyTest.scala:61)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read `log header` from file channel `sun.nio.ch.FileChannelImpl@cc86429`. Expected to read 17 bytes, but reached end of file after reading 0 bytes. Started read from position 85.
	at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
	at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
	at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
	at org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
	at org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:302)
	at kafka.log.LogSegment.translateOffset(LogSegment.scala:275)
	at kafka.log.LogSegment.read(LogSegment.scala:298)
	at kafka.log.Log.$anonfun$read$2(Log.scala:1515)
	at kafka.log.Log.read(Log.scala:2333)
	at kafka.log.LogConcurrencyTest$ConsumerTask.call(LogConcurrencyTest.scala:95)
	at kafka.log.LogConcurrencyTest$ConsumerTask.call(LogConcurrencyTest.scala:85)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.EOFException: Failed to read `log header` from file channel `sun.nio.ch.FileChannelImpl@cc86429`. Expected to read 17 bytes, but reached end of file after reading 0 bytes. Started read from position 85.
	at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:966)
	at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:68)
	at org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:41)
	at org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
	... 14 more

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I understand this is the bug fix, I'm only curious to see why we used

emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns)

not

emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)

I realized it is just the same, so curious if you intentionally use maxOffsetMetadata for any other reasons, but I think it is just to avoid unnecessary convertToOffsetMetadataOrThrow :)

Copy link
Copy Markdown
Contributor Author

@hachikuji hachikuji Apr 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It lets us skip the call to convertToOffsetMetadataOrThrow in order to find startOffsetMetadata. I tried changing the check below to >= and hit a similar error when trying to translate the offset in LogSegment. We might also be able to fix this by adding some additional checks in LogSegment to avoid the offset translation when the max position matches the start position of the requested offset.

if (iter.hasNext) {
val consumedBatch = iter.next()
try {
assertEquals("Consumed batch with unexpected leader epoch",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we check last / next offset of the batch as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I thought it would be overkill since the generation logic generates batches which are unique by base offset and leader epoch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool

testUncommittedDataNotConsumed(createLog(logConfig))
}

def testUncommittedDataNotConsumed(log: Log): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious how long would a single run take with 5000 records and a max batchsize of 10? Being a bit paranoid of it taking too long.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes about 10 second locally, which seems reasonable for a concurrency test.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@hachikuji
Copy link
Copy Markdown
Contributor Author

retest this please

@hachikuji hachikuji merged commit 413c4b5 into apache:trunk Apr 16, 2020
hachikuji added a commit that referenced this pull request Apr 16, 2020
)

The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
hachikuji added a commit that referenced this pull request Apr 16, 2020
)

The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request Apr 19, 2020
* 'trunk' of github.com:apache/kafka: (28 commits)
  MINOR: cleanup RocksDBStore tests  (apache#8510)
  KAFKA-9818: Fix flaky test in RecordCollectorTest (apache#8507)
  MINOR: reduce impact of trace logging in replica hot path (apache#8468)
  KAFKA-6145: KIP-441: Add test scenarios to ensure rebalance convergence (apache#8475)
  KAFKA-9881: Convert integration test to verify measurements from RocksDB to unit test (apache#8501)
  MINOR: improve test coverage for dynamic LogConfig(s) (apache#7616)
  MINOR: Switch order of sections on tumbling and hopping windows in streams doc. Tumbling windows are defined as "special case of hopping time windows" - but hopping windows currently only explained later in the docs. (apache#8505)
  KAFKA-9819: Fix flaky test in StoreChangelogReaderTest (apache#8488)
  HOTFIX: fix active task process ratio metric recording
  KAFKA-9796; Ensure broker shutdown is not stuck when Acceptor is waiting on connection queue (apache#8448)
  MINOR: Use streaming iterator with decompression buffer when building offset map (apache#8494)
  Add log message in release.py (apache#8461)
  KAFKA-9854 Re-authenticating causes mismatched parse of response (apache#8471)
  KAFKA-9838; Add log concurrency test and fix minor race condition (apache#8476)
  KAFKA-9703; Free up compression buffer after splitting a large batch
  KAFKA-9779: Add Stream system test for 2.5 release (apache#8378)
  KAFKA-7885: TopologyDescription violates equals-hashCode contract. (apache#6210)
  MINOR: KafkaApis#handleOffsetDeleteRequest does not group result correctly (apache#8485)
  HOTFIX: don't close or wipe out someone else's state (apache#8478)
  MINOR: add process(Test)Messages to the README (apache#8480)
  ...
qq619618919 pushed a commit to qq619618919/kafka that referenced this pull request May 12, 2020
…ache#8476)

The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
andrey-klochkov-liftoff pushed a commit to liftoffio/kafka that referenced this pull request Aug 17, 2020
…ache#8476)

The patch adds a new test case for validating concurrent read/write behavior in the `Log` implementation. In the process of verifying this, we found a race condition in `read`. The previous logic checks whether the start offset is equal to the end offset before collecting the high watermark. It is possible that the log is truncated in between these two conditions which could cause the high watermark to be equal to the log end offset. When this happens, `LogSegment.read` fails because it is unable to find the starting position to read from.

Reviewers: Guozhang Wang <wangguoz@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants