Skip to content

[KAFKA-6608] Add timeout parameter to methods which retrieves offsets#5014

Merged
hachikuji merged 18 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6608
May 30, 2018
Merged

[KAFKA-6608] Add timeout parameter to methods which retrieves offsets#5014
hachikuji merged 18 commits intoapache:trunkfrom
ConcurrencyPractitioner:KAFKA-6608

Conversation

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor

@ConcurrencyPractitioner ConcurrencyPractitioner commented May 12, 2018

Currently, this PR is based off of what was agreed upon in KIP-266. For further information, please look in this link:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=75974886

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@hachikuji Do you have any comments?

@ijuma ijuma requested a review from hachikuji May 21, 2018 18:57
@ijuma ijuma added this to the 2.0.0 milestone May 21, 2018
@hachikuji
Copy link
Copy Markdown
Contributor

@ConcurrencyPractitioner Sorry for the delay. Can you rebase please? I have merged @vvcephei's patch which includes adding timeout behavior for fetching committed offsets.

@hachikuji
Copy link
Copy Markdown
Contributor

Also, do you plan to implement the rest of the timeout APIs or shall I create a separate JIRA?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Test failures appear to be unrelated. JDK10 and JDK8 both have one failing test, but are different ones.
They are probably slightly flaky.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @hachikuji, I would be able to cover the remaining methods in KIP-266. Some the methods which I added however does not throw TimeoutException as the method description implies. I will get around to having those methods throw TimeoutException if timeout is exceeded. But other than that, please review. Thanks.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments.

this.assignors = assignors;
}

public long requestTimeoutMs() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't be adding unneeded public APIs.

/**
* @see KafkaConsumer#commitSync(Map)
*/
@Deprecated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the KIP, the only methods to be deprecated are close(TimeUnit, long) and poll(long).

/**
* @see KafkaConsumer#commitSync(Map, Duration
*/
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the public is redundant for an interface. Same for the other methods below.

acquireAndEnsureOpen();
try {
long currMillis = time.milliseconds();
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getAllTopicMetadata(timeout.toMillis());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getAllTopicMetadata already raises TimeoutException, so I don't think we need any of this logic.

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
acquireAndEnsureOpen();
try {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same logic that we have offsetsForTime(Map). We can avoid the duplication by having that method call this one. Similarly for the other APIs.

* defined
* @return true if all assigned positions have a position
*/
private boolean updateFetchPositions(long start, long timeoutMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we cannot use the other updateFetchPositions?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was before John changed the old updateFetchPositions to take in a timeout argument. Before that, I had to improvise and I thought this was needed. However, this code is now irrevalant so I will now just switch over.

* @param timeout The maximum allowable duration of the method
* @throws TimeoutException if committed offsets cannot be retrieved within set amount of time
*/
public void refreshCommittedOffsetsIfNeeded(long startMs, long timeout) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the previous comment. Why do we need new methods that duplicate all this logic?


assertEquals(539L, consumer.position(tp0));

consumer.poll(0);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems not needed.

@@ -1,4 +1,5 @@
/**

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please remove this

* since how much time one would need to block for Kafka Streams is
* still unknown
*/
private static final int DEFAULT_BLOCKING_TIME = 20000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand why this should be necessary. We're not removing any functionality from the consumer. If possible, I'd prefer to do changes for streams in a separate PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To tell the truth, this was never meant to be permanent. I did this simply as a temporary marker to allow me to pass the tests using KafkaStreams. I thought that in the future PR, we will remove this and come up with some other apparatus instead of what we have right now at the moment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave this for a follow-up? I still don't understand why we should need to touch anything in streams.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ConcurrencyPractitioner Can you respond here? I would prefer to keep the current streams implementation and leave improvements to a follow-up PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about retrieving StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG and using that as timeout ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just leave the current blocking implementation and fix this in a separate PR? Why does it need to be part of this PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a good reason why these changes must be here, please revert them. This will block merging the PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the comment in the change for StreamThread.java

Construction of StoreChangeLogReader in StreamThread needs timeout parameter.

        final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreConsumer, userStateRestoreListener, logContext);

I assume you suggest using Long.MAX_VALUE for that parameter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am suggesting leaving the code in as it is prior to this patch. We do not need to make the change in StoreChangelogReader to use the timeout in this patch. The old position() has not been deprecated and will continue to work as it has. In a follow-up, we can try to improve the behavior.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. Streams code shouldn't show up in this PR diff at all. If there are failing streams tests, then I think there is something wrong with the KafkaConsumer implementation.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a few more comments.

* @see KafkaConsumer#commitSync(Map, Duration
*/
void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets,
final Duration duration);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: misaligned

* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
public long position(TopicPartition partition) {
return position(partition, Duration.ofMillis(requestTimeoutMs));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should preserve the current behavior, which is to block indefinitely for this method.

Long offset = this.subscriptions.position(partition);
while (offset == null) {
final long startMs = time.milliseconds();
long finishMs = time.milliseconds();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: initialize to startMs?

Long.MAX_VALUE
);
}
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second argument to fetchCommittedOffsets is the timeout, not the current time. Also, when this method times out, it will return null. So we should check for that and raise TimeoutException.

Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
if (topicMetadata.isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is unnecessary since getTopicMetadata will raise TimeoutException if it times out. I realize there is a little inconsistency between some of these internal APIs. It is an area for improvement. The reason it is this way is that timeouts in poll() are "normal" and do not cause an exception.

consumer.poll(Duration.ZERO);

assertEquals(539L, consumer.position(tp0));
assertEquals(539L, consumer.position(tp0, Duration.ofSeconds(2)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes needed?

}

@Test
@Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing the behavior of these tests? They are intended to verify ACL behavior. We should just keep the current implementation.

}

@Test
@Test(expected = classOf[org.apache.kafka.common.errors.TimeoutException])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment. If a test case did not previously timeout, we shouldn't change it.

@@ -1,4 +1,5 @@
/**

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please remove

* since how much time one would need to block for Kafka Streams is
* still unknown
*/
private static final int DEFAULT_BLOCKING_TIME = 20000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave this for a follow-up? I still don't understand why we should need to touch anything in streams.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments.

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

/**
* @see KafkaConsumer#commitSync(Map, Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing end parethesis

Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);

/**
* @see KafkaConsumer#offsetsForTimes(java.util.Map, Duration)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can drop the java.util. prefix on all of these methods.

*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
commitSync(offsets, Duration.ofMillis(requestTimeoutMs));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use Long.MAX_VALUE to keep the current behavior.

throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
while (offset == null) {
updateFetchPositions(10);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we just call position(partition, Duration.ofMillis(Long.MAX_VALUE)?

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how important this is, but Long.MAX_VALUE isn't exactly the current behavior. It would only be a difference if someone is actually waiting for more than 24 days for the call to complete. This probably doesn't happen, but in my PR, I opted to just keep the exact current behavior of blocking forever. Please take this as a general remark.

That said, I'm not sure you meant to drop down from Long.MAX_VALUE and retryBackoffMs in the old code to 10 ms here.

*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
return committed(partition, Duration.ofMillis(requestTimeoutMs));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use Long.MAX_VALUE to keep the current behavior.

// batch update fetch positions for any partitions without a valid position
while (!updateFetchPositions(Long.MAX_VALUE)) {
log.warn("Still updating fetch positions");
updateFetchPositions(timeout - (time.milliseconds() - startMs));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use remainingTimeAtLeastZero? Also, if updateFetchPositions returns false, we can just break.

finishMs = time.milliseconds();
final long remainingTime = Math.max(0, timeout - (finishMs - startMs));

if (remainingTime > 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably skip this check and just do the following:

client.poll(remainingTimeAtLeastZero(timeout, finishMs - startMs));
offset = this.subscriptions.position(partition);
finishMs = time.milliseconds();

* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is not accurate for this method. The other methods need similar changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably remove these, particularly since we want to preserve the behavior of the old methods (e.g. blocking infinitely). In other words, these methods I think should never throw a TimeoutException.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not quite right. We do want them to throw TimeoutException. I was just pointing out that that the timeout used is the one passed directly, not the request timeout.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
assertEquals(1, records.count());
assertEquals(11L, consumer.position(tp0));
assertEquals(11L, consumer.position(tp0, Duration.ofSeconds(2)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there's a good reason for these changes, can you revert them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hachikuji, I reverted all test changes.

@@ -1,4 +1,5 @@
/**

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, please remove this.

try {
coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime)) {
throw new TimeoutException("Commiting offsets synchronously took too long.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: committing

General remark: it might be nice for debugging to include the string value of duration instead of "too long".

final long totalWaitTime = duration.toMillis();
try {
coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), totalWaitTime)) {
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei May 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this wrapping offsets to avoid mutating the input? If so, I think that Collections#unmodifiableMap is more efficient, I think.
(edit) Oh, I see this pre-existed your change.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So true story, we had a user who was passing a synchronized collection which ultimately caused a deadlock. We decided that it was worth the copy to keep such weird behavior out of the consumer internals.

@vvcephei
Copy link
Copy Markdown
Contributor

Thanks for the patch @ConcurrencyPractitioner .

I've left a few comments.

I think that for KIP-266, we decided to deprecate the old methods, but I didn't see any deprecation annotations, except on close. Note that they should go at a minimum on the Consumer interface, since subclasses will inherit the deprecation. I recommend also adding them to KafkaConsumer, just because we have the JavaDoc on the implementation, not the interface (I'm not sure why).

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @hachikuji, any more comments?

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

Hi @vvcephei, not too sure on this point. Because @hachikuji told me that the deprecation tags were reserved for only close() and poll(), although this is contrary to what was agreed in the KIP.

@hachikuji
Copy link
Copy Markdown
Contributor

@vvcephei Yeah, I didn't see in the KIP that the old methods were deprecated and I hadn't expected we would. Do you think there is a good reason to do so?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ConcurrencyPractitioner. Left a few more small comments. I think this is close.


/**
* Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition.
* Commit offsets returned on the last {@link #poll(long) poll()} for all the subscribed list of topics and partition.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems inadvertent? We want the documentation to refer to the new poll() API.

* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
public long position(TopicPartition partition, final Duration duration) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we name the argument timeout? Users will see this name in javadocs, so we should let it be as descriptive as possible. Same for the other methods. To avoid name collisions below, you can use timeoutMs for example.

* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
* @throws org.apache.kafka.common.errors.TimeoutException if the method blocks for longer than requestTimoutMs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update this to refer to the timeout argument, not the request timeout.

* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is not quite right. We do want them to throw TimeoutException. I was just pointing out that that the timeout used is the one passed directly, not the request timeout.

*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
return endOffsets(partitions, Duration.ofMillis(Long.MAX_VALUE));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some internal inconsistency which is probably causing some confusion. Only position(), commitSync() and committed() have the indefinite blocking behavior. The rest, including this one, should use the request timeout.


import java.nio.ByteBuffer
import java.util
import java.time.Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unneeded import. A couple more in ConsumerBounceTest and PlaintextConsumerTest.

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor Author

@hachikuji do you think this PR is ready?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch. Note I pushed a few minor tweaks to help get this over the line. I will fix the conflicts when I merge.

@hachikuji
Copy link
Copy Markdown
Contributor

I will merge in the morning presuming there are no problems with the build.

@hachikuji hachikuji merged commit f24a62d into apache:trunk May 30, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
apache#5014)

This patch implements the consumer timeout APIs from KIP-266 (everything except `poll()`, which was done separately).

Reviewers:  John Roesler <john@confluent.io>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants