Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,28 @@ public final class Metadata {
private final List<Listener> listeners;
private final ClusterResourceListeners clusterResourceListeners;
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;

/**
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
this(100L, 60 * 60 * 1000L);
}

public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation, false, new ClusterResourceListeners());
}

/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param allowAutoTopicCreation If this and the broker config 'auto.create.topics.enable' are true, topics that
* don't exist will be created by the broker when a metadata request is sent
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean allowAutoTopicCreation,
boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;
this.allowAutoTopicCreation = allowAutoTopicCreation;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
Expand Down Expand Up @@ -275,6 +273,10 @@ public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}

public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}

/**
* Set state to indicate if metadata for all topics in Kafka cluster is required or not.
* @param needMetadataForAllTopics boolean indicating need for metadata of all topics in cluster.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,8 @@ private long maybeUpdate(long now, Node node) {
if (metadata.needMetadataForAllTopics())
metadataRequest = MetadataRequest.Builder.allTopics();
else
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()));
metadataRequest = new MetadataRequest.Builder(new ArrayList<>(metadata.topics()),
metadata.allowAutoTopicCreation());


log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,6 @@ public DescribeTopicsResult describeTopics(Collection<String> topicNames) {
/**
* Describe some topics in the cluster.
*
* Note that if auto.create.topics.enable is true on the brokers,
* describeTopics(topicName, ...) may create a topic named topicName.
* There are two workarounds: either use AdminClient#listTopics and ensure
* that the topic is present before describing, or disable
* auto.create.topics.enable.
*
* @param topicNames The names of the topics to describe.
* @param options The options to use when describing the topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config) {

try {
metadata = new Metadata(config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG));
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), false);
List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
Expand Down Expand Up @@ -1144,7 +1144,7 @@ public DescribeTopicsResult describeTopics(final Collection<String> topicNames,
final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size());
final ArrayList<String> topicNamesList = new ArrayList<>();
for (String topicName : topicNames) {
if (topicFutures.get(topicName) == null) {
if (!topicFutures.containsKey(topicName)) {
topicFutures.put(topicName, new KafkaFutureImpl<TopicDescription>());
topicNamesList.add(topicName);
}
Expand All @@ -1153,9 +1153,14 @@ public DescribeTopicsResult describeTopics(final Collection<String> topicNames,
runnable.call(new Call("describeTopics", calcDeadlineMs(now, options.timeoutMs()),
new ControllerNodeProvider()) {

private boolean supportsDisablingTopicCreation = true;

@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(topicNamesList);
if (supportsDisablingTopicCreation)
return new MetadataRequest.Builder(topicNamesList, false);
else
return MetadataRequest.Builder.allTopics();
}

@Override
Expand Down Expand Up @@ -1189,6 +1194,15 @@ void handleResponse(AbstractResponse abstractResponse) {
}
}

@Override
boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
if (supportsDisablingTopicCreation) {
supportsDisablingTopicCreation = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works for now since we only support disabling auto topic creation in the latest version. However, if we bump up the version of metadata request again in the future, this check may be too restrictive. If this requires more changes, perhaps we can at least add a comment for now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would still work in that case. We only ever need to fall back once: if the highest version of metadata request supported by the broker is v3 or lower. The first attempt will fail and the subsequent one will work.

To make it more concrete, say we introduce metadata request v5. If the broker supports v4 or v5, then it will just work. If the broker only supports v3, it's the same as now: we fail on the first attempt and then we fallback to not using the flag. Does that make sense?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. That works then.

return true;
}
return false;
}

@Override
void handleFailure(Throwable throwable) {
completeAllExceptionally(topicFutures.values(), throwable);
Expand All @@ -1208,7 +1222,7 @@ public DescribeClusterResult describeCluster(DescribeClusterOptions options) {

@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new MetadataRequest.Builder(Collections.<String>emptyList());
return new MetadataRequest.Builder(Collections.<String>emptyList(), false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ private KafkaConsumer(ConsumerConfig config,
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
true, false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
String metricGrpPrefix = "consumer";
Expand Down Expand Up @@ -1352,7 +1353,7 @@ public List<PartitionInfo> partitionsFor(String topic) {
return parts;

Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
new MetadataRequest.Builder(Collections.singletonList(topic)), requestTimeoutMs);
new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
return topicMetadata.get(topic);
} finally {
release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serial
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
true, true, clusterResourceListeners);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,16 @@ public class Protocol {
/* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */
public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2;

/* The v4 metadata request has an additional field for allowing auto topic creation. The response is the same as v3. */
public static final Schema METADATA_REQUEST_V4 = new Schema(new Field("topics",
ArrayOf.nullable(STRING),
"An array of topics to fetch metadata for. If the topics array is null fetch metadata for all topics."),
new Field("allow_auto_topic_creation",
BOOLEAN,
"If this and the broker config 'auto.create.topics.enable' are true, " +
"topics that don't exist will be created by the broker. " +
"Otherwise, no topics will be created by the broker."));

public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."),
new Field("host", STRING, "The hostname of the broker."),
new Field("port", INT32,
Expand Down Expand Up @@ -142,8 +152,10 @@ public class Protocol {
"The broker id of the controller broker."),
new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));

public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3};
public static final Schema METADATA_RESPONSE_V4 = METADATA_RESPONSE_V3;

public static final Schema[] METADATA_REQUEST = {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3, METADATA_REQUEST_V4};
public static final Schema[] METADATA_RESPONSE = {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3, METADATA_RESPONSE_V4};

/* Produce api */

Expand Down Expand Up @@ -1181,8 +1193,8 @@ public class Protocol {
new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."),
newThrottleTimeField());

public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};
public static final Schema[] API_VERSIONS_REQUEST = {API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1};
public static final Schema[] API_VERSIONS_RESPONSE = {API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1};

/* Admin requests common */
public static final Schema CONFIG_ENTRY = new Schema(new Field("config_name", STRING, "Configuration name"),
Expand Down Expand Up @@ -1636,8 +1648,8 @@ public class Protocol {
new ArrayOf(DESCRIBE_ACLS_RESOURCE),
"The resources and their associated ACLs."));

public static final Schema[] DESCRIBE_ACLS_REQUEST = new Schema[] {DESCRIBE_ACLS_REQUEST_V0};
public static final Schema[] DESCRIBE_ACLS_RESPONSE = new Schema[] {DESCRIBE_ACLS_RESPONSE_V0};
public static final Schema[] DESCRIBE_ACLS_REQUEST = {DESCRIBE_ACLS_REQUEST_V0};
public static final Schema[] DESCRIBE_ACLS_RESPONSE = {DESCRIBE_ACLS_RESPONSE_V0};

public static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
new Field("creations",
Expand All @@ -1658,8 +1670,8 @@ public class Protocol {
new Field("error_message", NULLABLE_STRING, "The error message.")
))));

public static final Schema[] CREATE_ACLS_REQUEST = new Schema[] {CREATE_ACLS_REQUEST_V0};
public static final Schema[] CREATE_ACLS_RESPONSE = new Schema[] {CREATE_ACLS_RESPONSE_V0};
public static final Schema[] CREATE_ACLS_REQUEST = {CREATE_ACLS_REQUEST_V0};
public static final Schema[] CREATE_ACLS_RESPONSE = {CREATE_ACLS_RESPONSE_V0};

public static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
new Field("filters",
Expand Down Expand Up @@ -1691,8 +1703,8 @@ public class Protocol {
new Field("error_message", NULLABLE_STRING, "The error message."),
new Field("matching_acls", new ArrayOf(MATCHING_ACL), "The matching ACLs")))));

public static final Schema[] DELETE_ACLS_REQUEST = new Schema[] {DELETE_ACLS_REQUEST_V0};
public static final Schema[] DELETE_ACLS_RESPONSE = new Schema[] {DELETE_ACLS_RESPONSE_V0};
public static final Schema[] DELETE_ACLS_REQUEST = {DELETE_ACLS_REQUEST_V0};
public static final Schema[] DELETE_ACLS_RESPONSE = {DELETE_ACLS_RESPONSE_V0};

/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,26 @@

public class MetadataRequest extends AbstractRequest {

private static final String TOPICS_KEY_NAME = "topics";
private static final String ALLOW_AUTO_TOPIC_CREATION_KEY_NAME = "allow_auto_topic_creation";

public static class Builder extends AbstractRequest.Builder<MetadataRequest> {
private static final List<String> ALL_TOPICS = null;

// The list of topics, or null if we want to request metadata about all topics.
private final List<String> topics;
private final boolean allowAutoTopicCreation;

public static Builder allTopics() {
return new Builder(ALL_TOPICS);
// This never causes auto-creation, but we set the boolean to true because that is the default value when
// deserializing V2 and older. This way, the value is consistent after serialization and deserialization.
return new Builder(ALL_TOPICS, true);
}

public Builder(List<String> topics) {
public Builder(List<String> topics, boolean allowAutoTopicCreation) {
super(ApiKeys.METADATA);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}

public List<String> topics() {
Expand All @@ -55,11 +62,12 @@ public boolean isAllTopics() {

@Override
public MetadataRequest build(short version) {
if (version < 1) {
throw new UnsupportedVersionException("MetadataRequest " +
"versions older than 1 are not supported.");
}
return new MetadataRequest(this.topics, version);
if (version < 1)
throw new UnsupportedVersionException("MetadataRequest versions older than 1 are not supported.");
if (!allowAutoTopicCreation && version < 4)
throw new UnsupportedVersionException("MetadataRequest versions older than 4 don't support the " +
"allowAutoTopicCreation field");
return new MetadataRequest(this.topics, allowAutoTopicCreation, version);
}

@Override
Expand All @@ -77,18 +85,18 @@ public String toString() {
}
}

private static final String TOPICS_KEY_NAME = "topics";

private final List<String> topics;
private final boolean allowAutoTopicCreation;

/**
* In v0 null is not allowed and an empty list indicates requesting all topics.
* Note: modern clients do not support sending v0 requests.
* In v1 null indicates requesting all topics, and an empty list indicates requesting no topics.
*/
public MetadataRequest(List<String> topics, short version) {
public MetadataRequest(List<String> topics, boolean allowAutoTopicCreation, short version) {
super(version);
this.topics = topics;
this.allowAutoTopicCreation = allowAutoTopicCreation;
}

public MetadataRequest(Struct struct, short version) {
Expand All @@ -102,6 +110,10 @@ public MetadataRequest(Struct struct, short version) {
} else {
topics = null;
}
if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
allowAutoTopicCreation = struct.getBoolean(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME);
else
allowAutoTopicCreation = true;
}

@Override
Expand All @@ -122,6 +134,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
case 2:
return new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
case 3:
case 4:
return new MetadataResponse(throttleTimeMs, Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas);
default:
throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
Expand All @@ -137,6 +150,10 @@ public List<String> topics() {
return topics;
}

public boolean allowAutoTopicCreation() {
return allowAutoTopicCreation;
}

public static MetadataRequest parse(ByteBuffer buffer, short version) {
return new MetadataRequest(ApiKeys.METADATA.parseRequest(version, buffer), version);
}
Expand All @@ -148,6 +165,8 @@ protected Struct toStruct() {
struct.set(TOPICS_KEY_NAME, null);
else
struct.set(TOPICS_KEY_NAME, topics.toArray());
if (struct.hasField(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME))
struct.set(ALLOW_AUTO_TOPIC_CREATION_KEY_NAME, allowAutoTopicCreation);
return struct;
}
}
Loading