Skip to content

Kafka 4060 remove zk client dependency in kafka streams followup re-branched from trunk#2389

Closed
hjafarpour wants to merge 5 commits intoapache:trunkfrom
hjafarpour:KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-followup-from-trunk
Closed

Kafka 4060 remove zk client dependency in kafka streams followup re-branched from trunk#2389
hjafarpour wants to merge 5 commits intoapache:trunkfrom
hjafarpour:KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-followup-from-trunk

Conversation

@hjafarpour
Copy link
Copy Markdown

Re-branched the trunk and applied the changes to the new branch to simplify commit log.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/950/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/952/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/950/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/959/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/961/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/959/
Test PASSed (JDK 7 and Scala 2.10).

} else {
throw new StreamsException("Inconsistent response received from broker " + brokerId +
", expected correlation id " + clientRequest.correlationId() + ", but received " +
throw new StreamsException("Inconsistent response received from the broker, expected correlation id " + clientRequest.correlationId() + ", but received " +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why do we remove the broker id from the message?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broker id is not available and I didn't want to call getBrokerId.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClientRequest.destination is the broker id.

@enothereska
Copy link
Copy Markdown
Contributor

This PR removes the delete topic, and throws an exception is the number of partitions has changed. However, the number of partitions always changes in some cases, due to the topic auto-create issue. As I understand it, with this PR an app will fail. How do we address that? @hjafarpour @guozhangwang

Copy link
Copy Markdown
Contributor

@dguy dguy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments. Also, AFAICT StreamsKafkaClient doesn't have any tests - this should be fixed

if (existingTopicNamesPartitions.get(topic.name()) != null) {
if (existingTopicNamesPartitions.get(topic.name()) != topicsPartitionsMap.get(topic)) {
deleteTopics.put(topic, topicsPartitionsMap.get(topic));
throw new StreamsException("Internal topic with invalid partitons. Use 'kafka.tools.StreamsResetter' tool to clean up invalid topics before proceesing.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: - proceesing -> processing

final ClientResponse clientResponse = sendRequest(clientRequest);

if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
throw new StreamsException("Inconsistent response type for internal topic creation request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably needs to check clientResponse.hasResponse() otherwise it could throw NullPointerException

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

final ClientRequest clientRequest = kafkaClient.newClientRequest(getBrokerId(), new MetadataRequest.Builder(Arrays.asList(topic)), Time.SYSTEM.milliseconds(), true, null);
final ClientResponse clientResponse = sendRequest(clientRequest);
if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
throw new StreamsException("Inconsistent response type for internal topic metadata request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: need to check clientResponse.hasResponse()

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

public Collection<MetadataResponse.TopicMetadata> fetchTopicsMetadata() {

final ClientRequest clientRequest = kafkaClient.newClientRequest(getBrokerId(), new MetadataRequest.Builder(null), Time.SYSTEM.milliseconds(), true, null);
final ClientResponse clientResponse = sendRequest(clientRequest);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the check.

@enothereska
Copy link
Copy Markdown
Contributor

Ok, that seems to be in a different PR: #2379. Got it, thanks.

* partitions delete them and create new ones with correct number of partitons along with the non existing topics.
* Prepares a given internal topic.
* If the topic does not exist creates a new topic.
* If the topic with the correct number of partitions exists ignores it.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add "...exists"

@guozhangwang
Copy link
Copy Markdown
Contributor

@hjafarpour Could you address the above comments?

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/996/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/994/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/994/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/999/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1001/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/999/
Test PASSed (JDK 7 and Scala 2.10).

asfgit pushed a commit that referenced this pull request Jan 18, 2017
…e already exist with unexpected number of partitions

Re-branched the trunk and applied the changes to the new branch to simplify commit log.

Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>

Reviewers: Ismael Juma, Damian Guy, Eno Thereska, Guozhang Wang

Closes #2389 from hjafarpour/KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-followup-from-trunk

Address Ismael's comments upon merging

(cherry picked from commit 8e2cbae)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

Addressed @ijuma 's comments upon merging to trunk and 0.10.2.

@asfgit asfgit closed this in 8e2cbae Jan 18, 2017
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…e already exist with unexpected number of partitions

Re-branched the trunk and applied the changes to the new branch to simplify commit log.

Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>

Reviewers: Ismael Juma, Damian Guy, Eno Thereska, Guozhang Wang

Closes apache#2389 from hjafarpour/KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-followup-from-trunk

Address Ismael's comments upon merging
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants