Skip to content

KAFKA-4901: Make ProduceRequest thread-safe#2689

Closed
ijuma wants to merge 9 commits intoapache:trunkfrom
ijuma:produce-request-thread-safety
Closed

KAFKA-4901: Make ProduceRequest thread-safe#2689
ijuma wants to merge 9 commits intoapache:trunkfrom
ijuma:produce-request-thread-safety

Conversation

@ijuma
Copy link
Copy Markdown
Member

@ijuma ijuma commented Mar 15, 2017

If request logging is enabled, ProduceRequest can be accessed
and mutated concurrently from a network thread (which calls
toString) and a request handler thread (which calls
clearPartitionRecords()).

That can lead to a ConcurrentModificationException when iterating
the partitionRecords map.

The underlying thread-safety issue has existed since the server
started using the Java implementation of ProduceRequest in 0.10.0.
However, we were incorrectly not clearing the underlying struct until
0.10.2, so toString itself was thread-safe until that change. In 0.10.2,
toString is no longer thread-safe and we could potentially see a
NullPointerException given the right set of interleavings between
toString and clearPartitionRecords although we haven't seen that
happen yet.

In trunk, we changed the requests to have a toStruct method
instead of creating a struct in the constructor and toString was
no longer printing the contents of the Struct. This accidentally
fixed the race condition, but it meant that request logging was less
useful.

A couple of days ago, AbstractRequest.toString was changed to
print the contents of the request by calling toStruct().toString()
and reintroduced the race condition. The impact is more visible
because we iterate over a HashMap, which proactively
checks for concurrent modification (unlike arrays).

We will need a separate PR for 0.10.2.

ijuma added 2 commits March 15, 2017 13:25
If request logging is enabled, it can be accessed concurrently
from a network thread (which calls `toString`) and the
request handler thread (which calls `clearPartitionRecords()`).
More consistent line length usage.
@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2192/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2190/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2189/
Test FAILed (JDK 7 and Scala 2.10).

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 15, 2017

@ijuma ijuma changed the title MINOR: Make ProduceRequest thread-safe KAFKA-4901; Make ProduceRequest thread-safe Mar 15, 2017
@ijuma ijuma changed the title KAFKA-4901; Make ProduceRequest thread-safe KAFKA-4901: Make ProduceRequest thread-safe Mar 15, 2017
@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 15, 2017

Looks like trunk is broken due to headers that don't have the required format, PR to fix it: #2690

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 15, 2017

Note that the PR doesn't include tests yet. I wanted to get some feedback on the approach before doing that.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 15, 2017

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2193/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2196/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2194/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

.append(", acks=").append(acks)
.append(", timeout=").append(timeout);

if (partitionRecords == null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing open parenthesis?

Copy link
Copy Markdown
Contributor

@onurkaraman onurkaraman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the formatting differs from Struct's toString formatting.


if (partitionRecords == null)
bld.append(", partitions=").append(Utils.mkString(partitions, ","));
else
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this print out the record contents as well? If so, that seems very excessive.

I think the old ProduceRequest had it right: print out the various fields and on initialization of ProduceRequest, store a size-per-partition mapping so that even after clearing records, we can log the size-per-partition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, actually I agree with this. These messages have started to pollute some test logs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, what I'm trying to say is maybe we can simply avoid the race by not attempting to print the records at all.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I'll try to address the comments:

  1. Formatting differs from Struct's. That's right, I used the same format as ProduceRequest.Builder as that was easier, but I can use Struct's formatting if people prefer that (there's some inconsistency, either way).

  2. It prints a summary of the record contents (i.e. Record(magic = %d, attributes = %d, compression = %s, crc = %d, %s = %d, key = %d bytes, value = %d bytes) since that's the current behaviour in trunk. Note that the old Scala class only printed the partitions and sizes if TRACE logging was enabled. For DEBUG logging, the partitions information wasn't printed at all. We lost that distinction with the Java classes. I can fix that for Fetch and Produce requests in this PR and we can tackle the rest later. It seems to me that printing the record summary if TRACE logging is enabled would make sense. However, since the information won't be available in many/most cases, it makes sense to keep it simple and never print more than the size.

Copy link
Copy Markdown
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Good catch and thanks for the patch. Left one comment about an alternative approach. See if that makes sense.

return struct;
}

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the toString() method would generate different format of output depending on whether the request has been cleared or not? An alternative is to store the toString() result on construction, this introduces some overhead (hopefully not too much) but would maintain the original information of the requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we match the old ProducerRecord behavior and don't include the actual record data in the toString (which I think we should change), then your proposal ends up essentially matching what I said.

Copy link
Copy Markdown
Member Author

@ijuma ijuma Mar 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback @becketqin. Generating the string is not cheap and we don't need it unless trace logging is enabled. What do you think of the approach suggested here:

#2689 (comment)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I was not thinking about printing the entire message content out, just the record summary.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 15, 2017

I updated the PR as discussed although I left FetchRequest/Response alone for now since it's better for this PR to focus on ProduceRequest. I also started a system tests build for the Streams tests as they were failing due to this issue:

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/770/console

@onurkaraman
Copy link
Copy Markdown
Contributor

Thanks @ijuma. Can you provide a snippet of what the actual output looks like now with verbose/nonverbose logging?

@Override
public Struct toStruct() {
// Store it in a local variable to protect against concurrent updates
Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecordsOrFail();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't need this line and partitionRecordsOrFail() any more?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao, I added that to make the code fail-fast in case assumptions are broken in the future. Calling toStruct after clearPartitionRecords is a bug and we should fail with an appropriate error message if that happens.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. That makes sense.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 16, 2017

Regarding the system tests, there was a transient failure that did not happen on a subsequent re-run:

https://jenkins.confluent.io/job/system-test-kafka-branch-builder/771/console

@onurkaraman, the output would look like:

{acks=1,timeout=5000}
{acks=1,timeout=5000,partitionSizes=[test-0=10,test-1=20,foo-0=30]}

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2207/
Test PASSed (JDK 8 and Scala 2.11).

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma : LGTM. Just one minor comment.

bld.append("{acks=").append(acks)
.append(",timeout=").append(timeout);

if (verbose)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the non-verbose case, probably print out # of topic partitions?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. I added that.

@ijuma
Copy link
Copy Markdown
Member Author

ijuma commented Mar 16, 2017

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2207/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2210/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2208/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2216/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2213/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2214/
Test PASSed (JDK 8 and Scala 2.12).

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Mar 16, 2017

@ijuma : Thanks for the patch. LGTM.

@asfgit asfgit closed this in 1659ca1 Mar 16, 2017
@ijuma ijuma deleted the produce-request-thread-safety branch September 5, 2017 09:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants