Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

public class MockAdminClient extends AdminClient {
public static final String DEFAULT_CLUSTER_ID = "I4ZmrWqfT2e-upky_4fdPA";
Expand Down Expand Up @@ -365,51 +366,6 @@ synchronized public DescribeTopicsResult describeTopics(Collection<String> topic
return new DescribeTopicsResult(topicDescriptions);
}

@Override
public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just deprecate public method instead of removing it

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method has default implementation (see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java#L340) which calls the variety describeConfigs(Collection<ConfigResource>, DescribeConfigsOptions) so it is ok to remove the new implementation.

Map<ConfigResource, KafkaFuture<Config>> topicConfigs = new HashMap<>();

if (timeoutNextRequests > 0) {
for (ConfigResource requestedResource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new TimeoutException());
topicConfigs.put(requestedResource, future);
}

--timeoutNextRequests;
return new DescribeConfigsResult(topicConfigs);
}

for (ConfigResource requestedResource : resources) {
if (requestedResource.type() != ConfigResource.Type.TOPIC) {
continue;
}
for (Map.Entry<String, TopicMetadata> topicDescription : allTopics.entrySet()) {
String topicName = topicDescription.getKey();
if (topicName.equals(requestedResource.name()) && !topicDescription.getValue().markedForDeletion) {
if (topicDescription.getValue().fetchesRemainingUntilVisible > 0) {
topicDescription.getValue().fetchesRemainingUntilVisible--;
} else {
TopicMetadata topicMetadata = topicDescription.getValue();
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
Collection<ConfigEntry> entries = new ArrayList<>();
topicMetadata.configs.forEach((k, v) -> entries.add(new ConfigEntry(k, v)));
future.complete(new Config(entries));
topicConfigs.put(requestedResource, future);
break;
}
}
}
if (!topicConfigs.containsKey(requestedResource)) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnknownTopicOrPartitionException("Resource " + requestedResource + " not found."));
topicConfigs.put(requestedResource, future);
}
}

return new DescribeConfigsResult(topicConfigs);
}

@Override
synchronized public DeleteTopicsResult deleteTopics(Collection<String> topicsToDelete, DeleteTopicsOptions options) {
Map<String, KafkaFuture<Void>> deleteTopicsResult = new HashMap<>();
Expand Down Expand Up @@ -535,6 +491,19 @@ synchronized public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> fil

@Override
synchronized public DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {

if (timeoutNextRequests > 0) {
Map<ConfigResource, KafkaFuture<Config>> configs = new HashMap<>();
for (ConfigResource requestedResource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
future.completeExceptionally(new TimeoutException());
configs.put(requestedResource, future);
}

--timeoutNextRequests;
return new DescribeConfigsResult(configs);
}

Map<ConfigResource, KafkaFuture<Config>> results = new HashMap<>();
for (ConfigResource resource : resources) {
KafkaFutureImpl<Config> future = new KafkaFutureImpl<>();
Expand All @@ -551,7 +520,7 @@ synchronized public DescribeConfigsResult describeConfigs(Collection<ConfigResou
synchronized private Config getResourceDescription(ConfigResource resource) {
switch (resource.type()) {
case BROKER: {
int brokerId = Integer.valueOf(resource.name());
int brokerId = Integer.parseInt(resource.name());
if (brokerId >= brokerConfigs.size()) {
throw new InvalidRequestException("Broker " + resource.name() +
" not found.");
Expand All @@ -560,10 +529,15 @@ synchronized private Config getResourceDescription(ConfigResource resource) {
}
case TOPIC: {
TopicMetadata topicMetadata = allTopics.get(resource.name());
if (topicMetadata == null) {
throw new UnknownTopicOrPartitionException();
if (topicMetadata != null && !topicMetadata.markedForDeletion) {
if (topicMetadata.fetchesRemainingUntilVisible > 0)
topicMetadata.fetchesRemainingUntilVisible = Math.max(0, topicMetadata.fetchesRemainingUntilVisible - 1);
else return new Config(topicMetadata.configs.entrySet().stream()
.map(entry -> new ConfigEntry(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()));
Comment on lines +535 to +537
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think that we can use toConfigObject(topicMetadata.configs) here.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been merged when I notice your comment. Let me address it in another PR :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


}
return toConfigObject(topicMetadata.configs);
throw new UnknownTopicOrPartitionException("Resource " + resource + " not found.");
}
default:
throw new UnsupportedOperationException("Not implemented yet");
Expand Down