Skip to content

KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag#12206

Merged
hachikuji merged 16 commits intoapache:trunkfrom
niket-goel:kafka-13888-describe-quorum-additions
Jun 15, 2022
Merged

KAFKA-13888: Addition of Information in DescribeQuorumResponse about Voter Lag#12206
hachikuji merged 16 commits intoapache:trunkfrom
niket-goel:kafka-13888-describe-quorum-additions

Conversation

@niket-goel
Copy link
Copy Markdown
Contributor

This commit adds an Admin API handler for DescribeQuorum Request and also
adds in two new fields LastFetchTimestamp and LastCaughtUpTimestamp to
the DescribeQuorumResponse as described by KIP-836.

This commit does not implement the newly added fields. Those will be
added in a subsequent commit.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@niket-goel niket-goel marked this pull request as ready for review May 24, 2022 21:56
}

/**
* Describe the state of the raft quorum
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about "metadata quorum" or "kraft quorum"? Also nit: missing period.

observers.add(new ReplicaState(entry.getKey(), entry.getValue()));
}
QuorumInfo info = new QuorumInfo(topicName, leaderId, voters, observers);
return info;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this looks misaligned

private final long lastFetchTimeMs;
private final long lastCaughtUpTimeMs;

public ReplicaState(int replicaId, long logEndOffset) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to expose this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I did not get what you meant. We need access to it in KafkaAdminClient.java. Are you suggesting it be protected? or have a builder instead of exposing the constructor? Or building it through the QuorumInfo itself?

return logEndOffset;
}

public long lastFetchTimeMs() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we document the result for older versions of DescribeQuorum? I am tempted to suggest that we change the type to OptionalLong to make it clear that the value may be absent for older versions.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptionalLong makes sense. Let me make that change.

/**
* The name of the internal raft metadata topic
*/
private static final String METADATA_TOPIC_NAME = "__cluster_metadata";
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji May 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I hadn't really been thinking about the fact that we would have to expose this to the client. I guess that is the consequence of having such a general DescribeQuorum API. This makes me wonder if we ought to be more forward looking with the naming here. Suppose that we ultimately decide to use raft for partition replication as well. Then we might want to be able to use DescribeQuorum for user partitions as well, but we haven't given ourselves a lot of room for extension in the describeQuorum API. Would it make sense to make the new API more specific to the metadata quorum?

public DescribeMetadataQuorumResult describeMetadataQuorum(DescribeMetadataQuorumOptions options)

It is more verbose, but it is also clearer.

We should also move this constant to org.apache.kafka.common.internals.Topic.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great point! I think it makes sense to give ourselves wiggle room and call this MetadataQuorum.
Re: moving the constant -- I was actually looking for a common place to put it so that both the clients and the server code can use the same value. Is there any class that they share? Right now the name for the topic on the server side is defined in KafkaRaftServer.

/**
* This is used to describe per-partition state in the DescribeQuorumResponse.
*/
public class QuorumInfo {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to implement equals and hashCode? That is often useful for testing. Also it would be nice to have a good toString implementations.


@Timeout(120)
@Tag("integration")
class DescribeQuorumTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have another class DescribeQuorumRequesetTest. Would it make sense to move this test there? Perhaps we could generalize it DescribeQuorumIntegrationTest or something like that.

if (quorumResponse.data().errorCode() == Errors.NONE.code()) {
future.complete(createQuorumResult(quorumResponse));
} else {
future.completeExceptionally(Errors.forCode(quorumResponse.data().errorCode()).exception());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to have a couple unit tests in KafkaAdminClientTest covering failure and success cases.

.collect(Collectors.toList());
}

public static String nodesToVoterConnections(List<Node> nodes) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we're not using this anywhere. Included by mistake perhaps?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was included by mistake.. thanks for pointing out.

* @param options The options to use when describing the quorum.
* @return The DescribeQuorumResult.
*/
DescribeQuorumResult describeQuorum(DescribeQuorumOptions options);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, usually these APIs have a default option which leaves out the XOptions class. For example, see describeFeatures() above?

Also nit: can we not split up the describeFeatures APIs?

@hachikuji
Copy link
Copy Markdown
Contributor

cc @dengziming for reviews

@niket-goel
Copy link
Copy Markdown
Contributor Author

Published another revision addressing @hachikuji's comments.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel Thanks for the PR. I left a few comments.


private final KafkaFuture<QuorumInfo> quorumInfo;

public DescribeMetadataQuorumResult(KafkaFuture<QuorumInfo> quorumInfo) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we keep it package private?

String topicName = response.getTopicNameByIndex(partition);
Integer leaderId = response.getPartitionLeaderId(topicName, partition);
List<ReplicaState> voters = new ArrayList<>();
for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use response.getVoterOffsets(topicName, partition).forEach((replicaId, logEndOffset) ->. That makes the code a bit more readable in my opinion.

Integer leaderId = response.getPartitionLeaderId(topicName, partition);
List<ReplicaState> voters = new ArrayList<>();
for (Map.Entry<Integer, Long> entry: response.getVoterOffsets(topicName, partition).entrySet()) {
voters.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm.. OptionalLong.empty() seems wrong here. Don't you need to put the last fetch time and the last caughtup time?

}
List<ReplicaState> observers = new ArrayList<>();
for (Map.Entry<Integer, Long> entry: response.getObserverOffsets(topicName, partition).entrySet()) {
observers.add(new ReplicaState(entry.getKey(), entry.getValue(), OptionalLong.empty(), OptionalLong.empty()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here.


@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(Collections.singletonList(future), throwable);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why not directly calling future.completeExceptionally?

}
return new DescribeQuorumResponse(
new DescribeQuorumResponseData()
.setErrorCode(error.code()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation seems off here.

Comment on lines +610 to +612
0, 0, 0,
singletonList(new DescribeQuorumResponseData.ReplicaState()),
singletonList(new DescribeQuorumResponseData.ReplicaState()))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to fully populate the response here. That would have caught the issue that I mentioned earlier.

Comment on lines +4895 to +4896
final ExecutionException e = assertThrows(ExecutionException.class, future::get);
assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could use TestUtils.assertFutureThrows.

}

@ClusterTest
def testDescribeQuorumRequestToBrokers() = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the other tests, this test suite does not seem to be the right place for this test. I wonder if KRaftClusterTest would be more appropriate for instance but I don't feel strong about this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually had this test in a separate suite akin to KRaftClusterTest before, but moved it here because Jason pointed out that a test file for this API already existed. I think I am going to move the test to where you suggested. It does make more sense to me there.

val log = LoggerFactory.getLogger(classOf[DescribeQuorumIntegrationTest])

@ClusterTest(clusterType = Type.ZK)
def testDescribeQuorumNotSupportedByZkBrokers(): Unit = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extend testDescribeQuorum to verify that new fields are set when v1 is used and not set when v0 is used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed review @dajac. Some of the misses were me working with multiple versions of the code. Appreciate you catching those. I will udpate the next version with all of your comments addressed. One quick question about this comment though:
Is there an example of this somewhere that I can look at? Not sure how to change and verify the version of the request.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue is that the new fields are defaulted to -1 when not set, but also -1 when the value for a particualr voter or observer is unknown. Thinking if there is a way test this reliably.

Copy link
Copy Markdown
Member

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this work @niket-goel , I have 2 suggestions before going into details. Firstly, we can add a test in PlaintextAdminIntegrationTest for this change, the second one is less important, it's recommended to use AdminApiHandler instead of using runnable.call directly, you can refer to FenceProducersHandler.java as a simple example.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is very common for API details to change during implementation. Once we're satisfied and ready to merge, we should update the KIP. Typically we would also send a message to the vote thread in case there are any concerns.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will update the KIP and the thread once this PR is approved to reflect the changes here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: conventionally (if perhaps not always consistently), we prefer parenthesis for toString implementations

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we need to expose this? Usually we only expose the minimum necessary constructors since we're often stuck with what we expose for a long time.

Copy link
Copy Markdown
Contributor Author

@niket-goel niket-goel May 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need the constructor access in DescribeQuorumResponse.java. I can sort of work around that by returning the fields raw or encapsulated in a different object, but i thought this was simpler. If we want to prioritize hiding the constructor more, I can change it so that the getVoterInfo and getObserverInfo return raw fields. Do you have any thoughts on this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I think I was just being a little stupid :)

Comment thread clients/src/main/resources/common/message/DescribeQuorumRequest.json Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: getVoterInfo and getObserverInfo seem basically the same. Feels like we can factor out the common logic.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. We're checking the top-level error code, but the response also has a partition-level error to check.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused what happened here, but it looks like we should get rid of this class and rename DescribeQuorumTest to DescribeQuorumIntegrationTest. Also, did we lose the admin integration test you added?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it so that the new test resides in KRaftClusterTest. This was Daivd's suggestion and I felt it made more sense there, especially because that new test used a different test framework than these ones.
The change in the name got included by mistake and I will revert it what it was.

Copy link
Copy Markdown
Member

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @niket-goel , left some comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class name and file name are inconsistent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, The class name and file name are inconsistent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

response.getTopicNameByIndex(partition) is a little confusing here, how can we get a topic by partition, I think we should rename partition to topicIndex or use response.getTopicNameByIndex(0) directly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can send it to controllerSockerServer directly? for example, we set bootstrap.server=localhost:9093, we should prevent a client from doing this.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's worth checking partitionData.observers() here too.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ApiKeys.DESCRIBE_QUORUM.id, DESCRIBE_QUORUM.highestSupportVersion, DESCRIBE_QUORUM.lowestSupportVersion may be better here.

@niket-goel
Copy link
Copy Markdown
Contributor Author

Thanks for review @dengziming .

A few comments:

Firstly, we can add a test in PlaintextAdminIntegrationTest for this change

I looked at the file and the tests there look similar to the what was added to KraftClusterTest. Could you check if the intent was to run an integration test like that? We can always increase the checks that test does.

the second one is less important, it's recommended to use AdminApiHandler instead of using runnable.call directly, you can refer to FenceProducersHandler.java as a simple example.

I looked at the class and it looks like a wrapper for the runnable call. Do you mind if we make that improvement in a subsequent change (just trying to get this PR under control :) )

I wonder if we can send it to controllerSockerServer directly? for example, we set bootstrap.server=localhost:9093, we should prevent a client from doing this.

A couple of things here -- This code existed before this change (I just moved it). I would love to improve it though. I just did not understand what exactly your comment meant. Could you please elaborate?

Apart from the above questions I think I have addressed all the other concerns everyone raised in the latest version.

@dengziming
Copy link
Copy Markdown
Member

@niket-goel Thanks for the reply, I left 2 minor comments. I still find the class name is DescribeQuorumIntegrationTest but the file name is DescribeQuorumTest.scala, you should make them the same, I run your newly added tests locally but some of them failed, maybe there are still some bugs to be fixed.

For the last problem, I mean we can also get quorum directly from the controller, I can give you an example:

For a single node KRaft cluster, we can get quorum info from BrokerServer:

properties.put("bootstrap.servers",  "localhost:9092"); // connect to BrokerServer 9092
AdminClient client = AdminClient.create(Properties)
client.describeMetadataQuorum()

We can also connect to ControllerServer:

properties.put("bootstrap.servers",  "localhost:9093"); // connect to ControllerServer 9093
AdminClient client = AdminClient.create(Properties)
client.describeMetadataQuorum()

We can leave this problem to a separate PR since it's unrelated here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also check observers here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add : Unit after method

@niket-goel
Copy link
Copy Markdown
Contributor Author

Thanks @dengziming . I have pushed another version where all tests are passing locally. Appreciate your help with the review.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left in by mistake?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Left a few more comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these are all misaligned

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the idiomatic way to write this is

voterData.foreach { state =>
  ...
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should use assertEquals. The advantage is that we can see what the actual value was in the failure message, which is sometimes useful to understand the failure.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this?

for (version <- ApiKeys.DESCRIBE_QUORUM.allVersions.asScala) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems unused?

private QuorumInfo createQuorumResult(final DescribeQuorumResponse response) {
Integer partition = 0;
String topicName = response.getTopicNameByIndex(partition);
Integer leaderId = response.getPartitionLeaderId(topicName, partition);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel any comment here? I think this part still reads awkward. Converting to an intermediate map is conventional. An alternative would be to do a quick validation of the response. We can structure the checks like this:

  1. Check top-level error code
  2. Verify only one topic in the response which matches metadata topic
  3. Verify only one partition in the response with id 0
  4. Check partition-level error code.

This is similar to how we handle the request in KafkaRaftClient.handleDescribeQuorumRequest.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a test case with a partition-level error?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why there isn't an observer, we should have 4 observers since we have 4 brokers, this may also not be related to this PR, I will spend some time investigating.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the cause here, We set nodeId=-1 if it's a broker so observers.size==0

val nodeId = if (config.processRoles.contains(ControllerRole)) {
OptionalInt.of(config.nodeId)
} else {
OptionalInt.empty()
}

I changed it to val nodeId = OptionalInt.of(config.nodeId), then observers.size==4

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to do something about this in this PR?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment has not been addressed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Is it the expected behavior to have zero observers?

@niket-goel niket-goel force-pushed the kafka-13888-describe-quorum-additions branch from b58f7df to 5a08355 Compare June 2, 2022 18:41
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel Thanks for the update. I made another pass on it and left some comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is an extra space before with.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could remove this empty line.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could remove this empty line.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we say Returns a future containing the quorum info.?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • We should add a message in every exception provided back to the user. If we do this, we can log as debug instead of error.
  • Instead of throwing the exception, we usually complete the future and return.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about adding the message to the exception being returned.

The behavior of the code block is equivalent to completing the future and returning. I just didn't want to repeat the future completion code. Is there a well known convention to handle code blocks like this in JAVA?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could use .toShort.

Comment thread core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Indentation is off here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we verify anything if version is > 0?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is curious that we have the topic but not the partition. Is it on purpose, perhaps because we assume only one partition anyway?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think It's better to add partition here because we are making way for multi-raft.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intention was to keep it simple for now (given we are not using the system as multi-raft yet). We could add in the partition information here as well, but then how far do we want to go. Do we want this structure to mirror the TopicData and PartitionData types in the DescribeQuorumResponseData, or do we just want to do something simple (like adding in a partition ID) for now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel Maybe we had better leave topic/partition out of this object? I don't think there's any reason to expose __cluster_metadata to users of the admin client.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the intent to retry here? For some errors, such as auth failures, we probably would rather fail fast.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I added this block is because I noticed a gradle warning which suggested that (with the addition of the general Exception catch block), some runtime exceptions might get hidden. A little reading on this suggested that a best practice is to catch and re-throw runtime exceptions. I guess your comment here is that it is best to just do the same thing with runtime exceptions as we are doing with other exceptions and complete the future with an error. Am I understanding that correctly?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which exceptions are we catching here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this block is me trying to have a single future.completeExceptionally() call in this code block. We are catching UnknownServerException and any exception that might be returned by the server in its response. The generic catch handler can be avoided by not throwing the exceptions returned by the server, and instead just completing exceptionally within the block above. I just find it more maintainable to have single calls to completions etc. If this is creating some other issues in the code, i can change this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnknownServerException extends KafkaException, which extends RuntimeException. So I think all the errors that we are raising above get re-thrown in the previous catch.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: are we using this anymore?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no we are not. Thanks for the catch!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we usually add parenthesis for mutators

Comment on lines 4346 to 4347
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When lastFetchTimestamp or lastCaughtUpTimestamp are not provided (equals to -1), don't we want to return an empty option instead of returning an option containing -1?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth between that and ended up returning a -1 optional here. I now remember that the original intention was to have an empty optional. Will address this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check if size is not equal to 1 here and below. I guess an empty list is also possible.

Comment on lines 4350 to 4353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we extract this bloc into an helper method and share it with the voters part?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really think this is worth another method.?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should avoid code duplication. Btw, you could also use the stream api here.: partition.observers().stream.map(function).collect(Collectors.toList()). I leave this one up to you.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check all the data in quorum info in this integration test? Just checking the leader and the number of voters/observers seems weak to me.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about version > 0?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed replying to this comment earlier. Apologies about that. I am not sure if we can check anything for versions greater than zero at the moment. As of this PR we are not setting the fields in the response and so we cannot verify that. Was there something you had in mind that we should verify for versions > 0?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, should we just remove that version check and verify all versions? My understanding is that they should all be the same at the moment. We can update the test when we implement the server side. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert logEndOffset?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes.

…Voter Lag

This commit adds an Admin API handler for DescribeQuorum Request and also
adds in two new fields LastFetchTimestamp and LastCaughtUpTimestamp to
the DescribeQuorumResponse as described by KIP-836.

This commit does not implement the newly added fields. Those will be
added in a subsequent commit.
Changes:
Added some unit tests to KafkaAdminClientTest
Merged multiple DescribeQuorumTest files
Some other refactoring
Changes include:
* Fixing fields not being correctly set when processing API response in
  Admin Client
* Modified test to cover the above case
* Test refactoring
* Minor fixes in naming and indentation
* Removed explicit exception handling from DescribeMetadataQuorum::handleResponse.
* Removed an unused method
* Fixed Api handler to return OptionalEmpty for unknown fields.
@niket-goel niket-goel force-pushed the kafka-13888-describe-quorum-additions branch from 6a7575b to 51f7dd2 Compare June 8, 2022 21:33
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel Thanks for the update. I left a few more minor comments. There are also a few previous comments which have not been addressed.

Comment on lines 4350 to 4353
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We should avoid code duplication. Btw, you could also use the stream api here.: partition.observers().stream.map(function).collect(Collectors.toList()). I leave this one up to you.

Comment thread clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment may have been missed.

Comment on lines +83 to +84
ReplicaState(int replicaId, long logEndOffset,
OptionalLong lastFetchTimeMs, OptionalLong lastCaughtUpTimeMs) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The code format seems a bit off here. I think that we would format like this:

ReplicaState(
    int replicaId,
    long logEndOffset,
    OptionalLong lastFetchTimeMs,
    OptionalLong lastCaughtUpTimeMs
) {

public static DescribeQuorumResponse parse(ByteBuffer buffer, short version) {
return new DescribeQuorumResponse(new DescribeQuorumResponseData(new ByteBufferAccessor(buffer), version));
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This empty line could be removed.

val quorumState = admin.describeMetadataQuorum(new DescribeMetadataQuorumOptions)
val quorumInfo = quorumState.quorumInfo().get()

assertEquals(3000, quorumInfo.leaderId())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually omit parenthesis for getters. There are a few other cases in this file.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment has not been addressed.

assertEquals(3000, quorumInfo.leaderId())
assertEquals(0, quorumInfo.observers.size())
assertEquals(3, quorumInfo.voters.size())
quorumInfo.voters().forEach( voter => {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually format block as follow: quorumInfo.voters.forEach { voter =>.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, should we just remove that version check and verify all versions? My understanding is that they should all be the same at the moment. We can update the test when we implement the server side. What do you think?

Comment thread core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala Outdated
Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niket-goel Thanks for the update. Overall, LGTM. I left a few nits and one question.

Comment on lines +84 to +87
int replicaId,
long logEndOffset,
OptionalLong lastFetchTimeMs,
OptionalLong lastCaughtUpTimeMs
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We usually use 4 spaces indentation here.

val quorumInfo = quorumState.quorumInfo.get()

assertEquals(0, quorumInfo.observers.size())
assertEquals(3, quorumInfo.voters.size())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could omit parenthesis after size. The same for the previous line.

.getOrElse(throw new AssertionError("Failed to find leader among current voter states"))
assertTrue(leaderState.logEndOffset > 0)

val voterData = partitionData.currentVoters().asScala
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could omit parenthesis after currentVoters. The same for the next line.

Comment on lines +92 to +97
observerData.foreach { state =>
assertTrue(0 < state.replicaId)
assertTrue(0 < state.logEndOffset())
assertEquals(-1, state.lastFetchTimestamp())
assertEquals(-1, state.lastCaughtUpTimestamp())
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we just remove this block if assertEquals(0, observerData.size)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Is it the expected behavior to have zero observers?

Comment on lines +68 to +69
", voters=" + voters.toString() +
", observers=" + observers.toString() +
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I suppose that we could remove the toString as they as implicit.

Comment on lines +88 to +90
assertTrue(0 < state.logEndOffset())
assertEquals(-1, state.lastFetchTimestamp())
assertEquals(-1, state.lastCaughtUpTimestamp())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can also omit parenthesis for those..

@niket-goel
Copy link
Copy Markdown
Contributor Author

Thanks for the in-depth review @dajac @hachikuji and @dengziming. I have updated the PR with the final NITs from @dajac fixed. Are we good to merge with 1 LGTM or do we need 1 more?

Also in response to "Is it the expected behavior to have zero observers?"

@dengziming also raised the same question in a previous comment, and we decided to tackle that in a separate PR. I will cut a JIRA for this.

Copy link
Copy Markdown
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I will merge it when the build completes.

@niket-goel Could you update the KIP and notify the vote thread with the changes? Thanks.

@hachikuji
Copy link
Copy Markdown
Contributor

@dajac I think this is the reason there are no observers: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/raft/RaftManager.scala#L188. We should just use config.nodeId consistently regardless of process.roles. @niket-goel @dengziming Do either of you have time to fix this?

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the iterations. LGTM.

@niket-goel
Copy link
Copy Markdown
Contributor Author

Thanks @hachikuji . I have created https://issues.apache.org/jira/browse/KAFKA-13986 for the observer count issue and will raise a PR once this one is merged

@hachikuji
Copy link
Copy Markdown
Contributor

@niket-goel It looks like we are running into https://issues.apache.org/jira/browse/KAFKA-13940, which is causing the new integration test to be flaky. I think our options are either to handle this error code in the client or to change the raft implementation to throw a retriable error (probably either NotController or NotLeader).

@dajac
Copy link
Copy Markdown
Member

dajac commented Jun 14, 2022

@niket-goel It looks like we are running into https://issues.apache.org/jira/browse/KAFKA-13940, which is causing the new integration test to be flaky. I think our options are either to handle this error code in the client or to change the raft implementation to throw a retriable error (probably either NotController or NotLeader).

I lean towards the server side change to be consistent with other APIs.

@niket-goel
Copy link
Copy Markdown
Contributor Author

I agree with you guys. I have pushed a commit to remove the flaky test for now. How about we merge this PR as is (without the failing test) and then follow up on KAFKA-13940 to enable it back.

@niket-goel niket-goel force-pushed the kafka-13888-describe-quorum-additions branch from 57b48ee to 739b400 Compare June 15, 2022 01:24
@hachikuji hachikuji merged commit a126e3a into apache:trunk Jun 15, 2022
@niket-goel niket-goel mentioned this pull request Jun 15, 2022
3 tasks
mjsax pushed a commit to confluentinc/kafka that referenced this pull request Jun 30, 2022
…Voter Lag (apache#12206)

This commit adds an Admin API handler for DescribeQuorum Request and also
adds in two new fields LastFetchTimestamp and LastCaughtUpTimestamp to
the DescribeQuorumResponse as described by KIP-836.

This commit does not implement the newly added fields. Those will be
added in a subsequent commit.

Reviewers: dengziming <dengziming1993@gmail.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants