Skip to content

KAFKA-4024: Fix metadata update to not apply backoff until it experiences failure#1707

Closed
kawamuray wants to merge 18 commits intoapache:trunkfrom
kawamuray:KAFKA-4024-metadata-backoff
Closed

KAFKA-4024: Fix metadata update to not apply backoff until it experiences failure#1707
kawamuray wants to merge 18 commits intoapache:trunkfrom
kawamuray:KAFKA-4024-metadata-backoff

Conversation

@kawamuray
Copy link
Copy Markdown
Contributor

@kawamuray kawamuray commented Aug 6, 2016

Issue: https://issues.apache.org/jira/browse/KAFKA-4024

Fixes a bug that inappropriately applies backoff as interval between metadata updates even though the current one is outdated.

@kawamuray kawamuray changed the title KAFKA-4024 KafkaProducer should not initialize metadata with current timestamp KAFKA-4024: KafkaProducer should not initialize metadata with current timestamp Aug 6, 2016
@kawamuray kawamuray force-pushed the KAFKA-4024-metadata-backoff branch from fc02b23 to 02cbc6d Compare September 2, 2016 14:30
@kawamuray
Copy link
Copy Markdown
Contributor Author

Think I would follow up more unit tests.

@kawamuray kawamuray changed the title KAFKA-4024: KafkaProducer should not initialize metadata with current timestamp KAFKA-4024: Fix metadata update to not apply backoff until it experiences failure Sep 2, 2016
@kawamuray kawamuray force-pushed the KAFKA-4024-metadata-backoff branch from ee2eceb to 2e44c9d Compare September 6, 2016 02:27
@kawamuray
Copy link
Copy Markdown
Contributor Author

@ijuma Can you review this please?

Copy link
Copy Markdown
Member

@ijuma ijuma Sep 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a comment? It seems obvious from the code, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, when I first read this method it was bit hard to understand which value would be adopted, so I thought small comments would help new readers but I'm fine to remove these comments if you think it's not that helpful.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Sep 6, 2016

Thanks for the PR, LGTM (apart from some minor comments). The documentation for retry.backoff.ms says that it only applies to failed requests:

"The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios."

@hachikuji, can you double-check that this change is fine?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Sep 6, 2016

One more thing, do we want to make the first Metadata.update call consistent between KafkaProducer and KafkaConsumer?

The producer does:

this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

The consumer does:

this.metadata.update(Cluster.bootstrap(addresses), 0);

@kawamuray
Copy link
Copy Markdown
Contributor Author

One more thing, do we want to make the first Metadata.update call consistent between KafkaProducer and KafkaConsumer?

Yeah actually this is how this PR first look like since I thought this is the problem that happens only on the first send.
After this fix it is no longer harmful but let's fix it just to keep usages consistent.

@kawamuray kawamuray force-pushed the KAFKA-4024-metadata-backoff branch from 5aa67ce to 8d59a9e Compare September 6, 2016 14:46
@hachikuji
Copy link
Copy Markdown
Contributor

@kawamuray There are a bunch of cases in the clients where we can refetch metadata in a tight loop (e.g. if a partition leader is not available). The metadata fetches are successful, but we are waiting for a condition inside the response to become true. Previously the backoff protected us in this case, but we seem to be losing that, right? Is this not a concern?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Sep 7, 2016

@hachikuji That's a good point. If we want to continue using it in those cases, we probably need to update the config documentation.

@hachikuji
Copy link
Copy Markdown
Contributor

@ijuma Yeah, agreed. Seems like the problem is that the definition of "failed request" might not be too clear in the documentation. I think what we really mean is any metadata fetch containing errors, which covers topics not existing, partition leaders not available, etc. These cases are considered "successful" updates from the perspective of the Metadata object, but not from that of the producer/consumer. I think there are two problems with the current implementation:

  1. We treat the bootstrapped cluster as a regular metadata update and enforce the backoff, which causes the weird delay on startup. This should be easy to fix by providing a separate path for bootstrapping the metadata.
  2. The current implementation takes the interpretation of the backoff setting to the extreme and prevents any repeat metadata fetches within each backoff duration. Perhaps we can loosen this and override the backoff interval if the set of topics being fetched changes? In fact, if we did this, then the first problem would be solved as a matter of course.

Thoughts?

@kawamuray
Copy link
Copy Markdown
Contributor Author

kawamuray commented Sep 7, 2016

@hachikuji Thanks for your comment.
"if a partition leader is not available" means a leader broker went down but metadata isn't updated even on broker side yet so the fetched metadata still points dead broker as a leader of topic-partition right?
Indeed in that case a client can continually send metadata update request in each time it fails to produce to the dead leader, but the interval between each metadata update request is still controllable by reconnect.backoff.ms IIUC.
When producer tries to send next message which will be routed to dead broker, it tries to re-initiate connection to dead broker, fail, and reconnect.backoff.ms applied.
Out of curiosity, but how is it harmful if clients continually send metadata request in tight loop during broker failover?

@hachikuji
Copy link
Copy Markdown
Contributor

hachikuji commented Sep 7, 2016

@kawamuray Hmm.. interesting point. We do get some protection from the reconnect backoff in some cases, but I'm still a little uneasy in general about changing the backoff behavior. The metadata requests can be harmful, especially in some cases for the consumer where we have to fetch metadata for all topics (i.e. regex subscriptions). We've seen cases where a bunch of clients fetching metadata in a tight loop have brought the cluster to its knees.

What do you think about my suggestions above? We essentially keep the current behavior, but we bypass the backoff if the set of topics has changed since the last refresh.

@kawamuray
Copy link
Copy Markdown
Contributor Author

@hachikuji

but we bypass the backoff if the set of topics has changed since the last refresh.

Hmm, I think this approach still doesn't work for following case?

  1. Successful metadata update.
  2. Successful metadata update again which is triggerred by metadata expiration. Highly likely the fetched metadata havn't changed since the last time, so refreshBackoffMs isn't overridden.
  3. BrokerA failed immediately after 2.
  4. Client wants to send metadata update request immediately but blocked by refreshBackoffMs before it allowed to do.

@hachikuji
Copy link
Copy Markdown
Contributor

@kawamuray I think a longer delay to respond to failures is basically the price you have to pay to avoid overwhelming the servers. It takes a little time on the server side to detect the failure and migrate partitions anyway, so I don't think immediately sending a metadata update after the failure would have much value, though I could be wrong. I'd prefer for now to leave the current backoff behavior in tact except when we know something in the request has changed. Perhaps it would make sense to consider in a follow-up issue whether something like exponential backoff would be a more ideal way to address this problem in practice.

@kawamuray
Copy link
Copy Markdown
Contributor Author

kawamuray commented Sep 9, 2016

@hachikuji

It takes a little time on the server side to detect the failure and migrate partitions anyway

I think this isn't true for some cases. For example, when we perform rolling restart of brokers, each broker will shutdown gracefully so the controller can detect shutdown broker much faster than waiting zk session timeout.

so I don't think immediately sending a metadata update after the failure would have much value

However I agree with this. At least in case of broker failure, the first immediate metadata request likely results unchanged metadata, which results producer to wait retry.backoff.ms at least to recover anyways.

Actually I'm bit confusing now. The reason I lengthened retry.backoff.ms was to prevent a producer being send retry Produce requests in tight loop and exhaust retry attempts w/o waiting sufficient time for broker recovery, but seems the actual behavior is different from what I expected.
When I shortened retry.backoff.ms, on each Sender#run it tries to re-initiate connection to dead leader, so the NetworkClient#ready doesn't return true for unavailable topic-partition's leader node then retry count won't decreased in that attempt. Even on next loop, the Selector already knows that the dead leader has already disconnected so the same thing happens, so producer won't exhaust retry attempts during this loop. Also the WARN logs such as "Got error produce response with correlation id ..." won't flood in logs.

Then the first case(the first producer.send() blocked inappropriately) is the only problem as long as we keep retry.backoff.ms to reasonably smaller value?
In case, is there any reason we still better follow your idea(do not apply backoff when the last result changed)? Or is the single line of change - metadata.update(..., 0) - enough to fix this?

Anyway, I strongly agree with that it's much better to have exponential backoff or just an idea but something like "listen to metadata update" mechanism which sounds like not that hard to implement as we already have purgatory implementation in server side, in order to minimize MTTR. WDYT?

@kawamuray
Copy link
Copy Markdown
Contributor Author

@hachikuji
ugh, I think I didn't understand your idea properly.

Perhaps we can loosen this and override the backoff interval if the set of topics being fetched changes

I read through your comments again and realized that "set of topics" is topics field of Metadata you meant. Then your idea would prevents this:

producer.send(new ProducerRecord("topic-A", ...));
producer.send(new ProducerRecord("topic-B", ...)); // Maybe blocked refreshBackoffMs

Of course it prevents the first problem as well.
Sounds good to try. Let me update this PR to apply your suggestion.

@hachikuji
Copy link
Copy Markdown
Contributor

@kawamuray Yeah, sounds good, sorry if my suggestion wasn't clear at first (I've been looking at this code a little too long so I just assume everyone else has as well). Also good to know that we won't exhaust the retry limit for produce requests when dealing with partition leader failover.

@kawamuray kawamuray force-pushed the KAFKA-4024-metadata-backoff branch from 8d59a9e to 64ec2ec Compare September 13, 2016 07:32
@kawamuray
Copy link
Copy Markdown
Contributor Author

@hachikuji Updated PR based on your suggestion.
9c24e2d is a commit which implements your idea but let me explain about this new patch a bit as I had to make an unexpected modification 64ec2ec to accomplish the purpose.

I modified the test code like this: https://gist.github.com/kawamuray/fdfcfec85f179417c3a2d4866f055df5 and ran against 9c24e2d then the result was look like below:

./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties -Dretry.backoff.ms=5000 KafkaProducerMetadataUpdateDurationTest
...
[2016-09-13 16:07:53,676] INFO Kafka version : 0.10.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2016-09-13 16:07:53,676] INFO Kafka commitId : 9c24e2d4025e5170 (org.apache.kafka.common.utils.AppInfoParser)
[2016-09-13 16:07:53,679] DEBUG Initialize connection to node -1 for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,679] DEBUG Initiating connection to node -1 at HOST-1:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,731] DEBUG Initialize connection to node -2 for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,732] DEBUG Initiating connection to node -2 at HOST-2:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,732] DEBUG Initialize connection to node -3 for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,732] DEBUG Initiating connection to node -3 at HOST-3:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,737] DEBUG Completed connection to node -1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,738] DEBUG Completed connection to node -2 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:53,738] DEBUG Completed connection to node -3 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,751] DEBUG Sending metadata request {topics=[test]} to node -3 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,786] DEBUG Updated cluster metadata version 2 to Cluster(nodes = [HOST-3:9022 (id: 3 rack: null), HOST-1:9022 (id: 1 rack: null), HOST-2:9022 (id: 2 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
[2016-09-13 16:07:58,799] DEBUG Initiating connection to node 1 at HOST-1:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,799] DEBUG Initialize connection to node 3 for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,799] DEBUG Initiating connection to node 3 at HOST-3:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,799] DEBUG Initiating connection to node 2 at HOST-2:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,801] DEBUG Completed connection to node 1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,803] DEBUG Completed connection to node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:07:58,803] DEBUG Completed connection to node 2 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:08:03,801] DEBUG Sending metadata request {topics=[test,kawamuray]} to node 1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:08:03,807] DEBUG Updated cluster metadata version 3 to Cluster(nodes = [HOST-2:9022 (id: 2 rack: null), HOST-3:9022 (id: 3 rack: null), HOST-1:9022 (id: 1 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = kawamuray, partition = 1, leader = 2, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = kawamuray, partition = 0, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = kawamuray, partition = 2, leader = 3, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
tSendTest1 = 5121, tSendTest2 = 0, tSendKawamuray = 5008
[2016-09-13 16:08:03,807] INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

This reuslt has 2 problems:

  1. It still blocks producer.send()
  2. NetworkClient initiates connection to all brokers before sending the first metadata request.

I digged into DefaultMetadataUpdater and found that timeToNextReconnectAttempt is causing this.

The cause of problem 1 is that DefaultMetadataUpdater updates lastNoNodeAvailableMs even when a connection is being established to a broker, which leads retry.backoff.ms interval among each maybeUpdate().
The cause of problem 2 is that DefaultMetadataUpdater is not waiting a connection being established to a broker in the previous loop. By changing maybeUpdate() to wait a connection being established properly, maybeUpdate() is no longar riskly to call in tight loop so we can let each loop of maybeUpdate() to try checking connection availability for sending metadata request.

After appling 64ec2ec, the experiment result look like this:

./bin/kafka-run-class.sh -Dlog4j.configuration=file:./log4j.properties -Dretry.backoff.ms=5000 KafkaProducerMetadataUpdateDurationTest
...
[2016-09-13 16:25:18,526] INFO Kafka version : 0.10.1.0-SNAPSHOT (org.apache.kafka.common.utils.AppInfoParser)
[2016-09-13 16:25:18,526] INFO Kafka commitId : 8cd55d2a465e3b0a (org.apache.kafka.common.utils.AppInfoParser)
[2016-09-13 16:25:18,528] DEBUG Initialize connection to node -1 for sending metadata request (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,528] DEBUG Initiating connection to node -1 at HOST-1:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,593] DEBUG Completed connection to node -1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,612] DEBUG Sending metadata request {topics=[test]} to node -1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,644] DEBUG Updated cluster metadata version 2 to Cluster(nodes = [HOST-1:9022 (id: 1 rack: null), HOST-2:9022 (id: 2 rack: null), HOST-3:9022 (id: 3 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
[2016-09-13 16:25:18,657] DEBUG Initiating connection to node 1 at HOST-1:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,658] DEBUG Initiating connection to node 3 at HOST-3:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,658] DEBUG Initiating connection to node 2 at HOST-2:9022. (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,661] DEBUG Completed connection to node 1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,662] DEBUG Sending metadata request {topics=[test,kawamuray]} to node 1 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,662] DEBUG Completed connection to node 3 (org.apache.kafka.clients.NetworkClient)
[2016-09-13 16:25:18,667] DEBUG Updated cluster metadata version 3 to Cluster(nodes = [HOST-3:9022 (id: 3 rack: null), HOST-2:9022 (id: 2 rack: null), HOST-1:9022 (id: 1 rack: null)], partitions = [Partition(topic = test, partition = 1, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = kawamuray, partition = 1, leader = 2, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = test, partition = 0, leader = 3, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = kawamuray, partition = 0, leader = 1, replicas = [1,2,3,], isr = [2,3,1,]), Partition(topic = test, partition = 2, leader = 2, replicas = [1,2,3,], isr = [3,2,1,]), Partition(topic = kawamuray, partition = 2, leader = 3, replicas = [1,2,3,], isr = [3,2,1,])]) (org.apache.kafka.clients.Metadata)
[2016-09-13 16:25:18,667] DEBUG Completed connection to node 2 (org.apache.kafka.clients.NetworkClient)
tSendTest1 = 130, tSendTest2 = 0, tSendKawamuray = 9
[2016-09-13 16:25:18,667] INFO Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

@kawamuray
Copy link
Copy Markdown
Contributor Author

@hachikuji ping.

@hachikuji
Copy link
Copy Markdown
Contributor

@kawamuray Thanks for the reminder. I'll look at this in the morning.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kawamuray Left a couple comments. It's a nice find that the current client does a bunch of unnecessary connection attempts when fetching metadata initially.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Sep 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you split this into several separate test cases? It makes the tests more useful (since they are more focused) and easier to maintain. At a minimum, you can have different cases for Metadata.setTopics, Metadata.add, and Metadata. needMetadataForAllTopics.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK will do.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should have a isDisconnected method in ClusterConnectionStates?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Sep 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it make more sense to check lastConnectAttemptNodeId in maybeUpdate(long now) before finding a new node with leastLoadedNode?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, is it possible?
guess it doesn't work in following case?

  1. Have two nodes. Node-A(disconnected), Node-B(connected, in-flight-requests is 1)
  2. leastLoadedNode() returns Node-A which isn't connected yet. Initiate connection to Node-A.
  3. On the next poll(), a response from Node-B received so the in-flight-requests is now 0.
  4. On next maybeUpdate() leastLoadedNode() returns Node-B.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Sep 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slight change in behavior, right? Previously if we cannot find any node to connect to, we'll block in poll() for the retry backoff. Now it seems we'll end up calling poll() in a tight loop until we can find a node to connect to. Maybe the logic in maybeUpdate needs to be something like this:

  1. Check if there is a connection in progress. If so, return the reconnect backoff as the timeout.
  2. If there is no connection in progress, call leastLoadedNode to find some node to connect to. If this returns null, return retry backoff as is currently done.
  3. Next, if we have a node to connect to, then we initiate the connect (if disconnected) and set lastConnectAttemptNodeId.
  4. Finally, if already connected, send the metadata request.

Would that keep the current backoff behavior while avoiding the unnecessary connection attempts?

Copy link
Copy Markdown
Contributor Author

@kawamuray kawamuray Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right.
Even though maybeUpdate() returns the maximum time to wait as a backoff, network event(i.e, connection establishment, receive response) can trigger immediate progress to the next step.
I basically agree with your four steps except one thing about 2.
So whenever leastLoadedNode returns null, that means all nodes are currently backing off right? In case, we should return the minimum remaining reconnect backoff time of all nodes I guess. I will add a followup commit based on this. let's make further discussion on updated patch if necessary.

Thanks for nice catch.

Copy link
Copy Markdown
Contributor Author

@kawamuray kawamuray Oct 31, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after I added fdb0984, I found that a test start failing.

org.apache.kafka.clients.consumer.KafkaConsumerTest > testChangingRegexSubscription FAILED
    java.lang.IllegalStateException: Next in line response did not match expected request
        at org.apache.kafka.clients.MockClient.send(MockClient.java:147)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:392)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:223)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:308)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:277)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:259)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1012)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:978)
        at org.apache.kafka.clients.consumer.KafkaConsumerTest.testChangingRegexSubscription(KafkaConsumerTest.java:623)

This is because the current regexp-based subscription relies on Metadata.Listener to update the list of topics that it subscribing on: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L176

So this line still isn't removable even though we apply fdb0984.
However it might still make sense to keep the change as it reduced few LOC in other situations and keep calling requestUpdate() explicitly for the cases like this.
I'm on the line to revert fdb0984 as I think it is making the responsibility of calling requestUpdate() among caller and Metadata but
Just added 944f9f8 but still wanna hear your opinion @hachikuji .

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Yeah, I forgot that we should always request a new metadata update when changing the regex subscription, even though it doesn't change anything about the topics we are fetching metadata for.

@kawamuray kawamuray force-pushed the KAFKA-4024-metadata-backoff branch from 2e93b75 to 944f9f8 Compare October 31, 2016 07:20
@hachikuji
Copy link
Copy Markdown
Contributor

LGTM. Thanks for the patch! I'm going to run some system tests prior to merging to make sure we didn't miss anything obvious, but hopefully we can wrap this up shortly.

@hachikuji
Copy link
Copy Markdown
Contributor

hachikuji commented Nov 1, 2016

Quick update, there were some system test failures: http://testing.confluent.io/confluent-kafka-branch-builder-system-test-results/?prefix=2016-10-31--001.1477945677--hachikuji--KAFKA-4024--944f9f8/. I see a lot of messages like this from the consumer:

[2016-10-31 19:58:09,501] TRACE Skipping fetch for partition test_topic-0 because there is an in-flight request to worker3:9092 (id: 2 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)

Haven't had time yet to dig into it to see if it is related to any changes in this patch.

@kawamuray
Copy link
Copy Markdown
Contributor Author

kawamuray commented Nov 2, 2016

Doh! super good catch @hachikuji .

I ran the system test locally and it reproduced exactly.
Investigated and found that we made another mistake WRT .requestUpdate() 😢

I turned on TRACE logging of VerifiableProducer and found that it stops requesting update of metadata after it received following response from the broker, which is incomplete as the leader of partition 0 is missing, then it never continue producing accumulated records even though broker failovered successfully:

[2016-11-02 07:48:12,455] DEBUG Updated cluster metadata version 68 to Cluster(id = hbf5ulUERN642ViwSrvdxQ, nodes = [worker1:9092 (id: 2 rack: null)], partitions = [Partition(topic = test_topic, partition = 0, leader = none, replicas = [1,2,], isr = [])]) (org.apache.kafka.clients.Metadata)

This is because we made this change: fdb0984#diff-499d1af91cfaf42966a8e2e485a1b4b9L180

Metadata.add() triggers requestUpdate() whenever the added partition is missing in the current list but in this case, the topic-partition was already added previously so no one will call requestUpdate().

@kawamuray
Copy link
Copy Markdown
Contributor Author

Reverted requestUpdate() for unknown leader partitions 48f598f and the system tests turns to passing.
So in the end fdb0984 couldn't reduce even single line from production code :(

@hachikuji
Copy link
Copy Markdown
Contributor

@kawamuray Good find and thanks for investigating.

@asfgit asfgit closed this in e795ad9 Nov 4, 2016
@hachikuji
Copy link
Copy Markdown
Contributor

Merged to trunk. Thanks again for the very delicate patch. We'll have to keep an eye on the nightly builds to make sure we didn't miss anything. 😅

@kawamuray
Copy link
Copy Markdown
Contributor Author

We had a long run :D Thanks so much for your insightful review @hachikuji!

@hachikuji
Copy link
Copy Markdown
Contributor

Oh, forgot to mention. Prior to merging, I added a little extra explanation to one of the comments in the code. Hopefully it's reasonable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants