Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.apache.kafka.clients;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
Expand Down Expand Up @@ -50,8 +51,8 @@ public final class Metadata {
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;

private final long refreshBackoffMs;
private final long metadataExpireMs;
private final long retryBackoffMs;
private final long metadataMaxAgeMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
Expand All @@ -68,24 +69,25 @@ public final class Metadata {
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
this(100L, 60 * 60 * 1000L);
this(ConsumerConfig.DEFAULT_RETRY_BACKOFF_MS, ConsumerConfig.DEFAULT_METADATA_MAX_AGE_MS);
}

public Metadata(long refreshBackoffMs, long metadataExpireMs) {
this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
public Metadata(long retryBackoffMs, long metadataMaxAgeMs) {
this(retryBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners());
}

/**
* Create a new Metadata instance
* @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* @param retryBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
* @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
* @param metadataMaxAgeMs The maximum amount of time that metadata can be retained without refresh
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.refreshBackoffMs = refreshBackoffMs;
this.metadataExpireMs = metadataExpireMs;

public Metadata(long retryBackoffMs, long metadataMaxAgeMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
this.retryBackoffMs = retryBackoffMs;
this.metadataMaxAgeMs = metadataMaxAgeMs;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
Expand Down Expand Up @@ -115,12 +117,12 @@ public synchronized void add(String topic) {

/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
* current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
* current info can be updated (i.e. backoff time has elapsed); If an update has been requested then the expiry time
* is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataMaxAgeMs - nowMs, 0);
long timeToAllowUpdate = this.lastRefreshMs + this.retryBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}

Expand Down Expand Up @@ -263,11 +265,18 @@ public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}

/**
* The max allowable age of metadata.
*/
public long maxAgeMs() {
return this.metadataMaxAgeMs;
}

/**
* The metadata refresh backoff in ms
*/
public long refreshBackoff() {
return refreshBackoffMs;
return retryBackoffMs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public class ConsumerConfig extends AbstractConfig {

/** <code>metadata.max.age.ms</code> */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
public static final long DEFAULT_METADATA_MAX_AGE_MS = 5 * 60 * 1000L;

/**
* <code>max.partition.fetch.bytes</code>
Expand Down Expand Up @@ -163,6 +164,7 @@ public class ConsumerConfig extends AbstractConfig {
* <code>retry.backoff.ms</code>
*/
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
public static final long DEFAULT_RETRY_BACKOFF_MS = 100L;

/**
* <code>metrics.sample.window.ms</code>
Expand Down Expand Up @@ -236,7 +238,7 @@ public class ConsumerConfig extends AbstractConfig {
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Type.LONG,
5 * 60 * 1000,
DEFAULT_METADATA_MAX_AGE_MS,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
Expand Down Expand Up @@ -300,7 +302,7 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
100L,
DEFAULT_RETRY_BACKOFF_MS,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,34 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C
}

/**
* Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
* due to metadata being unavailable
* Abort the batches that have been sitting in RecordAccumulator
* 1. longer than the requestTimeout when metadata is fresh; or
* 2. longer than {@link Sender#metadataStaleMs} when metadata is stale.
*/
public List<RecordBatch> abortExpiredBatches(int requestTimeout, long now) {
public List<RecordBatch> abortExpiredBatches(int requestTimeout, boolean isMetadataStale, Cluster cluster, long now) {
List<RecordBatch> expiredBatches = new ArrayList<>();
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
TopicPartition tp = entry.getKey();
// We only check if the batch should be expired if the partition does not have a batch in flight.
// This is to prevent later batches from being expired while an earlier batch is still in progress.
// Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
// is only active in this case. Otherwise the expiration order is not guaranteed.
if (!muted.contains(tp)) {

// In the case where the user wishes to achieve strict ordering, (i.e.,
// max.in.flight.request.per.connection=1) the muted membership condition helps ensure that batches also
// expire in order. Partitions are muted only if strict ordering is enabled and there are in-flight batches.
// This prevents later batches from being expired while an earlier batch is still in progress.
//
// Second, we expire batches when we know that we can't make progress on a given topic-partition.
// Specifically, we check if
// (1) the partition does not have an in-flight batch.
// (2) Either the metadata is too stale or we don't have a leader for a partition.
//
// The second condition allows expiration of ready batches if we don't have a leader for
// the partition.
//
// Finally, we expire batches if the last metadata refresh was too long ago. I.e., > {@link Sender#metadataStaleMs}.
// We might run in to this situation when the producer is disconnected from all the brokers.
boolean guaranteeExpirationOrder = muted.contains(tp);
if (!guaranteeExpirationOrder && (isMetadataStale || cluster.leaderFor(tp) == null)) {
synchronized (dq) {
// iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
RecordBatch lastBatch = dq.peekLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class Sender implements Runnable {

/* the max time to wait for the server to respond to the request*/
private final int requestTimeout;

/* the max time to wait before expiring batches. */
private final long metadataStaleMs;

public Sender(KafkaClient client,
Metadata metadata,
Expand All @@ -120,6 +123,8 @@ public Sender(KafkaClient client,
this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;

this.metadataStaleMs = getMetadataStaleMs(metadata.maxAgeMs(), requestTimeout, metadata.refreshBackoff(), retries);
}

/**
Expand Down Expand Up @@ -207,12 +212,14 @@ void run(long now) {
this.accumulator.mutePartition(batch.topicPartition);
}
}

List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

boolean stale = isMetadataStale(now, metadata, this.metadataStaleMs);
if (stale || !result.unknownLeaderTopics.isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is the logic here correct? Let's say there is a network issue and the producer can't talk to any broker. All leaders are not be empty. It will take at least 300 seconds before the metadata becomes stable. Then some of the records will have to sit in the bufferpool much longer than the default 30 second request timeout before being expired. This seems a big change in behavior.

Also, would it be better to consolidate the check here into accumulator.abortExpiredBatches()?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition in Sender.java is a quick check to avoid expensive iteration in abortExpiredBatches when we can. I.e., when metadata is fresh and leaders for all partitions are known there's no reason to drop into the iteration in abortExpiredBatches. We'll not expire any batches in that case.

List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, stale, cluster, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
}

sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
Expand Down Expand Up @@ -375,6 +382,19 @@ public void wakeup() {
this.client.wakeup();
}

/* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds
* the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to account
* for retries and transient network disconnections. There must be enough time for at least one retry.
* As retries are subject to both regular request timeout and the backoff, staleness determination is delayed
* by that factor.
*/
static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) {
return metadataMaxAgeMs + (retries + 1) * (requestTimeoutMs + refreshBackoffMs);
}

static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) {
return (now - metadata.lastSuccessfulUpdate()) > metadataStaleMs;
}
/**
* A collection of sensors for the sender
*/
Expand Down
14 changes: 7 additions & 7 deletions clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
public class MetadataTest {

private long refreshBackoffMs = 100;
private long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
private long metadataMaxAgeMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs);
private AtomicReference<String> backgroundError = new AtomicReference<String>();

@After
Expand Down Expand Up @@ -75,7 +75,7 @@ public void testMetadata() throws Exception {
t1.join();
t2.join();
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
time += metadataExpireMs;
time += metadataMaxAgeMs;
assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
}

Expand Down Expand Up @@ -153,7 +153,7 @@ public void testClusterListenerGetsNotifiedOfUpdate() {
MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
ClusterResourceListeners listeners = new ClusterResourceListeners();
listeners.maybeAdd(mockClusterListener);
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners);
metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, listeners);

String hostName = "www.example.com";
Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
Expand Down Expand Up @@ -246,7 +246,7 @@ public void onMetadataUpdate(Cluster cluster) {

@Test
public void testTopicExpiry() throws Exception {
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, true, new ClusterResourceListeners());

// Test that topic is expired if not used within the expiry interval
long time = 0;
Expand Down Expand Up @@ -278,7 +278,7 @@ public void testTopicExpiry() throws Exception {

@Test
public void testNonExpiringMetadata() throws Exception {
metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners());

// Test that topic is not expired if not used within the expiry interval
long time = 0;
Expand All @@ -302,7 +302,7 @@ public void testNonExpiringMetadata() throws Exception {
HashSet<String> topics = new HashSet<>();
topics.add("topic4");
metadata.setTopics(topics);
time += metadataExpireMs * 2;
time += metadataMaxAgeMs * 2;
metadata.update(Cluster.empty(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
}
Expand Down
Loading