Skip to content

KAFKA-13834: fix drain batch starving issue#12066

Merged
showuon merged 15 commits intoapache:trunkfrom
ruanliang-hualun:trunk
Apr 21, 2022
Merged

KAFKA-13834: fix drain batch starving issue#12066
showuon merged 15 commits intoapache:trunkfrom
ruanliang-hualun:trunk

Conversation

@ruanliang-hualun
Copy link
Copy Markdown
Contributor

Maintains a drainIndex for each node

Maintains a drainIndex for each node
Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruanliang-hualun , thanks for the PR. Left some comments. In addition to that, you also need:

  1. add tests
  2. update PR title to the pattern like this: "KAFKA-13834: fix drain batch starving issue"

// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
private Map<String,Integer> nodesDrainIndex;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: need a space between String, and Integer Map<String, Integer>

/* to make starvation less likely this loop doesn't start at 0 */
int drainIndex = getDrainIndex(node.idString());
int start = drainIndex = drainIndex % parts.size();
updateDrainIndex(node.idString(),drainIndex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can update Drain Index at the end of drainBatchesForOneNode. We don't need to update it each time.

this.drainIndex = (this.drainIndex + 1) % parts.size();

drainIndex = (drainIndex + 1) % parts.size();
updateDrainIndex(node.idString(),drainIndex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, don't need to update it each time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advice , i will modify and resubmit

@ruanliang-hualun ruanliang-hualun changed the title bugfix for KAFKA-13834 KAFKA-13834: fix drain batch starving issue Apr 19, 2022
Copy link
Copy Markdown
Contributor

@RivenSun2 RivenSun2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left a comment.

import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.*;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do not import *.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the advice, I've already corrected it

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ruanliang-hualun , thanks for the test. Left some comments. Thanks.

import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove this line

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this line. Thanks.

// The following variables are only accessed by the sender thread, so we don't need to protect them.
private final Set<TopicPartition> muted;
private int drainIndex;
private Map<String, Integer> nodesDrainIndex;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private final Map<String, Integer> nodesDrainIndex;

@@ -560,12 +561,11 @@ private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, i
List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
List<ProducerBatch> ready = new ArrayList<>();
/* to make starvation less likely this loop doesn't start at 0 */
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment still apply now?

return ready;
}

int getDrainIndex(String idString) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private method, please

return nodesDrainIndex.computeIfAbsent(idString, s -> 0);
}

void updateDrainIndex(String idString, int drainIndex) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private method, please

judgeValidTp(batchss2,tp2,tp4);
}

@SuppressWarnings("unchecked")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unchecked?

}

@SuppressWarnings("unchecked")
private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method name is not clear. How about verifyTopicPartitionInBatches?
And the argument name, batches1 -> batches


@SuppressWarnings("unchecked")
private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) {
assertEquals(tp.length,batches1.entrySet().size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between batches1.entrySet().size() and batches1.size()?

@SuppressWarnings("unchecked")
private void judgeValidTp(Map<Integer, List<ProducerBatch>> batches1, TopicPartition... tp) {
assertEquals(tp.length,batches1.entrySet().size());
List<ProducerBatch> list = new ArrayList();;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

additional ;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the variable name, maybe topicPartitionsInBatch is much clear?

List<ProducerBatch> list = new ArrayList();;
for (Map.Entry<Integer, List<ProducerBatch>> entry : batches1.entrySet()) {
List<ProducerBatch> batches = entry.getValue();
list.add(batches.get(0));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also verify the batches size is 1 here, before adding into list

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the topoicPartition only here? ex:
list.add(batches.get(0).topicPartition);

So in the follow assertion, we can just do:
assertArrayEquals(list, tp)

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, I benefited a lot

ruanliang01 added 4 commits April 20, 2022 15:37
@guozhangwang
Copy link
Copy Markdown
Contributor

@ruanliang-hualun Thanks for the PR, and thanks to @showuon for the detailed reviews!! I took a look at the edge test and I think without the actual fix it would indeed cause starvation. I do not have any further comments, and would leave to @showuon to finally approve and merge it :)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Left some comments. Thanks.

import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this line. Thanks.

// drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained
// The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4
Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
verifyTopicPartitionInBatches(batchss2, tp2, tp4);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add another line to make sure it'll pick tp1, tp3 in next run. That is:

        // drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained
        // The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4
        Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
        verifyTopicPartitionInBatches(batchss2, tp2, tp4);

        // make sure in next run, the drain index will start from the beginning
        Map<Integer, List<ProducerBatch>> batches3 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
        verifyTopicPartitionInBatches(batches3, tp1, tp3);

Comment on lines 567 to 571
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();

// Only proceed if the partition has no in-flight batches.
if (isMuted(tp))
continue;
Copy link
Copy Markdown
Member

@showuon showuon Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a 2nd look, I found this change is not right. If the tp is muted or other reason, it continued, and we'll have a infinite loop because the drainIdex won't get incremented, right? Please fix it and add a test for this case. Thanks.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right , i fix the problem please recheck


// drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained
Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
assertEquals(2, batches1.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to verify the size because we'll verify it in verifyTopicPartitionInBatches, right?


// drain batches from 2 nodes: node1 => tp2, node2 => tp4, because the max request size is full after the first batch drained
// The drain index should start from next topic partition, that is, node1 => tp2, node2 => tp4
Map<Integer, List<ProducerBatch>> batchss2 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: batchss2 -> batches2

List<TopicPartition> topicPartitionsInBatch = new ArrayList<TopicPartition>();
for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
List<ProducerBatch> batchList = entry.getValue();
assertEquals(batchList.size(), 1);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method semantic is assertEquals(expected, actual)
So, we should do: assertEquals(1, batchList.size());, right?

}

for (int i = 0 ; i < tp.length ; i++){
assertEquals(topicPartitionsInBatch.get(i), tp[i]);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here:
assertEquals(tp[i], topicPartitionsInBatch.get(i));

}

private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) {
assertEquals(tp.length,batches.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add space after comma (,)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need space after comma (,)

PartitionInfo part = parts.get(drainIndex);
TopicPartition tp = new TopicPartition(part.topic(), part.partition());
updateDrainIndex(node.idString(), drainIndex);
drainIndex = (drainIndex + 1) % parts.size();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update the drainIndex before increase drainIndex is necessary

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, and please add a test for the continue case. Thanks.

TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();

updateDrainIndex(node.idString(), drainIndex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we can updateDrainIndex after while loop. After all, we only use the drainIndex variable inside while loop, not related to nodesDrainIndex map, right?

Copy link
Copy Markdown
Contributor Author

@ruanliang-hualun ruanliang-hualun Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the drainIndex should be related to tp, take a look at the following example(node has two tp, tp-0 tp-1)

do {
   drainIndex = 1 ;
   tp = tp-1;
   // update drainIndex to 0 without updateDrainIndex(1)  before this
   drainIndex = (drainIndex + 1) % parts.size(); 
   synchronized (deque) {
      // satisfy the break condition
      break;
    }
}
while (start != drainIndex)
// updateDrainIndex(drainIndex = 0);    (the drainIndex should be 1 right? so the next drainBatch will begin with tp-1)
updateDrainIndex(drainIndex)

and in my test case, if updateDrainIndex after while loop. test will fail

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Make sense.

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Looks better. Left some minor comments.

TopicPartition tp = new TopicPartition(part.topic(), part.partition());
this.drainIndex = (this.drainIndex + 1) % parts.size();

updateDrainIndex(node.idString(), drainIndex);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. Make sense.

verifyTopicPartitionInBatches(batches3, tp1, tp3);

// test the contine case, mute the tp4 and drain batches from 2nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)
// add record for tp2, tp3, tp4 mute the tp4
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the 1st comment: // test the contine case,...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: // add record for tp2, tp3, tp4, [and] mute the tp4

accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
accum.mutePartition(tp4);
Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add the above comment here, and change to
drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the detailed reviews, I learned a lot

}

private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) {
assertEquals(tp.length,batches.size());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need space after comma (,)

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the fix! Let's wait for the jenkins tests completion.


for (int i = 0 ; i < tp.length ; i++){
for (int i = 0; i < tp.length; i++) {
assertEquals(tp[i], topicPartitionsInBatch.get(i));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clients:checkstyleTest FAILED, I just change the code style

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. Let's wait for the jenkins build results! :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@showuon jenkins build result comes out, the error seems uncorrelated, please have a check thanks alot

@showuon
Copy link
Copy Markdown
Member

showuon commented Apr 21, 2022

Failed tests are unrelated.

    Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testSetLog4jConfigurations()
    Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testLegacyAlterConfigs()
    Build / JDK 8 and Scala 2.12 / kafka.server.KRaftClusterTest.testCreateClusterAndPerformReassignment()
    Build / JDK 17 and Scala 2.13 / kafka.server.KRaftClusterTest.testIncrementalAlterConfigs()

@showuon showuon merged commit 252e095 into apache:trunk Apr 21, 2022
@showuon
Copy link
Copy Markdown
Member

showuon commented Apr 21, 2022

@ruanliang-hualun , thanks for your contribution!

philipnee added a commit to philipnee/kafka that referenced this pull request Apr 22, 2022
revert test commit

test commit

revert test commit

MINOR: Supplement the description of `Valid Values` in the documentation of `compression.type` (apache#11985)

Because a validator is added to ProducerConfig.COMPRESSION_TYPE_CONFIG and KafkaConfig.CompressionTypeProp, the corresponding testCase is improved to verify whether the wrong value of compression.type will throw a ConfigException.

Reviewers: Mickael Maison <mickael.maison@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

MINOR: Correct Connect docs on connector/task states (apache#11914)

The `DESTROYED` state is represented internally as a tombstone record when running in distributed mode and by the removal of the connector/task from the in-memory status map when running in standalone mode. As a result, it will never appear to users of the REST API, and we should remove mention of it from our docs so that developers creating tooling against the REST API don't write unnecessary logic to account for that state.

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13828; Ensure reasons sent by the consumer are small (apache#12043)

This PR reworks the reasons used in the ConsumerCoordinator to ensure that they remain reasonably short.

Reviewers: Bruno Cadonna <bruno@confluent.io>

KAFKA-13542: Add rebalance reason in Kafka Streams (apache#12018)

Reviewers: Bruno Cadonna <bruno@confluent.io>, David Jacot <djacot@confluent.io>

MINOR: Verify stopReplica if broker epoch not stale (apache#12040)

Verify that ReplicaManager.stopReplica is called if the stop replica
request doesn't result in a stale broker epoch error.

Reviewers: Mickael Maison <mimaison@users.noreply.github.com>

KAFKA-13651; Add audit logging to `StandardAuthorizer` (apache#12031)

This patch adds audit support through the kafka.authorizer.logger logger to StandardAuthorizer. It
follows the same conventions as AclAuthorizer with a similarly formatted log message. When
logIfAllowed is set in the Action, then the log message is at DEBUG level; otherwise, we log at
trace. When logIfDenied is set, then the log message is at INFO level; otherwise, we again log at
TRACE.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-13743: Prevent topics with conflicting metrics names from being created in KRaft mode apache#11910

In ZK mode, the topic "foo_bar" will conflict with "foo.bar" because of limitations in metric
names. We should implement this in KRaft mode.  This PR also changes TopicCommandIntegrationTest to
support KRaft mode.

Reviewers: Colin P. McCabe <cmccabe@apache.org>

KAFKA-12613: Fix inconsistent validation logic between KafkaConfig and LogConfig (apache#10472)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

KAFKA-13823 Feature flag changes from KIP-778 (apache#12036)

This PR includes the changes to feature flags that were outlined in KIP-778.  Specifically, it
changes UpdateFeatures and FeatureLevelRecord to remove the maximum version level. It also adds
dry-run to the RPC so the controller can actually attempt the upgrade (rather than the client). It
introduces an upgrade type enum, which supersedes the allowDowngrade boolean. Because
FeatureLevelRecord was unused previously, we do not need to introduce a new version.

The kafka-features.sh tool was overhauled in KIP-778 and now includes the describe, upgrade,
downgrade, and disable sub-commands.  Refer to
[KIP-778](https://cwiki.apache.org/confluence/display/KAFKA/KIP-778%3A+KRaft+Upgrades) for more
details on the new command structure.

Reviewers: Colin P. McCabe <cmccabe@apache.org>, dengziming <dengziming1993@gmail.com>

MINOR: Update LICENSE-binary (apache#12051)

Updates the license file.

Reviewer: Bill Bejeck <bbejeck@apache.org>

MINOR: Remove redundant conditional judgments in Selector.clear() (apache#12048)

Condition 'sendFailed' is always 'false' when reached.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

MINOR: ignore unused configuration when ConsumerCoordinator is not constructed (apache#12041)

Following PR apache#11940, ignore unused config when ConsumerCoordinator is not constructed.

Reviewers: Guozhang Wang <wangguoz@gmail.com>

[MINOR] Update upgrade documentation for 3.2 (apache#12055)

Reviewer: Bruno Cadonna <cadonna@apache.org>

MINOR: Move some TopicCommand and ConfigCommand integration tests to unit tests (apache#12024)

Move some TopicCommand and ConfigCommand integration tests to unit tests to speed up the tests

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Improve the description of principal under different mechanisms of sasl (apache#11947)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: update comment in LocalLog.replaceSegments() (apache#12054)

Reviewers: Luke Chen <showuon@gmail.com>

MINOR: Make link in quickstart dynamic (apache#12057)

Reviewer: Matthias J. Sax <mjsax@apache.org>

KAFKA-13769: Explicitly route FK join results to correct partitions (apache#11945)

Prior to this commit FK response sink routed FK results to
SubscriptionResolverJoinProcessorSupplier using the primary key.

There are cases, where this behavior is incorrect. For example,
if KTable key serde differs from the data source serde which might
happen without a key changing operation.

Instead of determining the resolver partition by serializing the PK
this patch includes target partition in SubscriptionWrapper payloads.
Default FK response-sink partitioner extracts the correct partition
from the value and routes the message accordingly.

Reviewers: Matthias J. Sax <matthias@confluent.io>

KAFKA-13807: Fix incrementalAlterConfig and refactor some things (apache#12033)

Ensure that we can set log.flush.interval.ms at the broker or cluster level via
IncrementalAlterConfigs. This was broken by KAFKA-13749, which added log.flush.interval.ms as the
second synonym rather than the first. Add a regression test to DynamicConfigChangeTest.

Create ControllerRequestContext and pass it to every controller API. This gives us a uniform way to
pass through information like the deadline (if there is one) and the Kafka principal which is
making the request (in the future we will want to log this information).

In ControllerApis, enforce a timeout for broker heartbeat requests which is equal to the heartbeat
request interval, to avoid heartbeats piling up on the controller queue. This should have been done
previously, but we overlooked it.

Add a builder for ClusterControlManager and ReplicationControlManager to avoid the need to deal
with a lot of churn (especially in test code) whenever a new constructor parameter gets added for
one of these.

In ControllerConfigurationValidator, create a separate function for when we just want to validate
that a ConfigResource is a valid target for DescribeConfigs. Previously we had been re-using the
validation code for IncrementalAlterConfigs, but this was messy.

Split out the replica placement code into a separate package and reorganize it a bit.

Reviewers: David Arthur <mumrah@gmail.com

MINOR: Correct spelling errors in KafkaRaftClient (apache#12061)

Correct spelling errors in KafkaRaftClient

Reviewers: José Armando García Sancio <jsancio@users.noreply.github.com>

MINOR: Fix TestDowngrade.test_upgrade_and_downgrade (apache#12027)

The second validation does not verify the second bounce because the verified producer and the verified consumer are stopped in `self.run_validation`. This means that the second `run_validation` just spit out the same information as the first one. Instead, we should just run the validation at the end.

Reviewers: Jason Gustafson <jason@confluent.io>

KAFKA-8785: fix request timeout by waiting for metadata cache up-to-date (apache#11681)

The reason why this test is flaky is because we have race condition at the beginning of the test, when brokers are staring up, and the adminClient is requesting for brokers metadata. Once the adminClient only got partial metadata, the test will fail, because in these tests, brokers will be shutdown to test leader election.

Fix this issue by explicitly waiting for metadata cache up-to-date in waitForReadyBrokers, and let admin client get created after waitForReadyBrokers.

Reviewers: Jason Gustafson <jason@confluent.io>, David Jacot <djacot@confluent.io>, dengziming <dengziming1993@gmail.com>

KAFKA-13832: Fix flaky testAlterAssignment (apache#12060)

In KRaft mode the metadata is not propagate in time, so we should should wait for it before make assertions.

Reviewers:  Luke Chen <showuon@gmail.com>

KAFKA-13654: Extend KStream process with new Processor API (apache#11993)

Updates the KStream process API to cover the use cases
of both process and transform, and deprecate the KStream transform API.

Implements KIP-820

Reviewer: John Roesler <vvcephei@apache.org>

KAFKA-13835: Fix two bugs related to dynamic broker configs in KRaft (apache#12063)

Fix two bugs related to dynamic broker configs in KRaft. The first bug is that we are calling reloadUpdatedFilesWithoutConfigChange when a topic configuration is changed, but not when a
broker configuration is changed. This is backwards. This function must be called only for broker
configs, and never for topic configs or cluster configs.

The second bug is that there were several configurations such as max.connections which are related
to broker listeners, but which do not involve changing the registered listeners. We should support
these configurations in KRaft. This PR fixes the configuration change validation to support this case.

Reviewers: Jason Gustafson <jason@confluent.io>, Matthew de Detrich <mdedetrich@gmail.com>

KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (apache#12004)

Reviewers: Mickael Maison <mickael.maison@gmail.com>

MINOR: Scala cleanups in core (apache#12058)

Reviewers: Luke Chen <showuon@gmail.com>, Divij Vaidya <diviv@amazon.com>

KAFKA-13799: Improve documentation for Kafka zero-copy (apache#12052)

Improve documentation for Kafka zero-copy. Kafka combines pagecache and zero-copy to greatly improve message consumption efficiency. But zero-copy only works in PlaintextTransportLayer.

Reviewers: Divij Vaidya <divijvaidya13@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13785: [5/N][emit final] cache for time ordered window store (apache#12030)

A new cache for RocksDBTimeOrderedWindowStore. Need this because RocksDBTimeOrderedWindowStore's key ordering is different from CachingWindowStore which has issues for MergedSortedCacheWindowStoreIterator

Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>

KAFKA-13588: consolidate `changelogFor` methods to simplify the generation of internal topic names (apache#11703)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

HOTFIX: fix broken trunk due to conflicting and overlapping commits (apache#12074)

Reviewers: Victoria Xia <victoria.xia@confluent.io>, David Arthur <mumrah@gmail.com>

MINOR: cleanup for postProcessAndValidateIdempotenceConfigs method (apache#12069)

Reviewers: Luke Chen <showuon@gmail.com>

KAFKA-13834: fix drain batch starving issue (apache#12066)

In drainBatchesForOneNode method, there's possibility causing some partitions in a node will never get picked. Fix this issue by maintaining a drainIndex for each node.

Reviewers: Luke Chen <showuon@gmail.com>, RivenSun <91005273+RivenSun2@users.noreply.github.com>

MINOR: Improve Gradle Caching and Fix Deprecations (apache#12003)

* Fix UP-TO-DATE check in `create*VersionFile` tasks

`create*VersionFile` tasks explicitly declared output UP-TO-DATE status
 as being false. This change properly sets the inputs to
`create*VersionFile` tasks to the `commitId` and `version` values and
sets `receiptFile` locally rather than in an extra property.

* Enable output caching for `process*Messages` tasks

`process*Messages` tasks did not have output caching enabled. This
change enables that caching, as well as setting a property name and
RELATIVE path sensitivity.

* Fix existing Gradle deprecations

Replaces `JavaExec#main` with `JavaExec#mainClass`

Replaces `Report#destination` with `Report#outputLocation`

Adds a `generator` configuration to projects that need to resolve
the `generator` project (rather than referencing the runtimeClasspath
of the `generator` project from other project contexts.

Reviewers: Mickael Maison <mickael.maison@gmail.com>


MINOR; Fix partition change record noop check (apache#12073)

When LeaderRecoveryState was added to the PartitionChangeRecord, the
check for being a noop was not updated. This commit fixes that and
improves the associated test to avoid this oversight in the future.

Reviewers: Colin Patrick McCabe <cmccabe@apache.org>

MINOR: revert back to 60s session timeout for static membership test (apache#11881)

Reviewers: Guozhang Wang <wangguoz@gmail.com>

KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (apache#12075)

This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <mumrah@gmail.com>, dengziming <dengziming1993@gmail.com>
@showuon
Copy link
Copy Markdown
Member

showuon commented Apr 23, 2022

@ruanliang-hualun , I found the test should add one more test case, which is "normal" test case. We can put the maxSize as a large one, and expect all the partitions should be drained. ex:

// new added
// set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
        verifyTopicPartitionInBatches(batches1, tp1, tp2, tp3, tp4);

        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
        accum.append(tp4, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());



// original tests
// drain batches from 2 nodes: node1 => tp1, node2 => tp3, because the max request size is full after the first batch drained
Map<Integer, List<ProducerBatch>> batches1 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
        verifyTopicPartitionInBatches(batches1, tp1, tp3);

What do you think?
If you think it makes sense, are you interested in submitting another PR to add the test?
Thank you.

@ruanliang-hualun
Copy link
Copy Markdown
Contributor Author

@showuon I am glad to submit another PR to add the test, thanks for your reminding

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants