diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index f717001f4deaa..0e7bb7b598bff 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -12,6 +12,7 @@
*/
package org.apache.kafka.clients;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
@@ -50,8 +51,8 @@ public final class Metadata {
public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000;
private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L;
- private final long refreshBackoffMs;
- private final long metadataExpireMs;
+ private final long retryBackoffMs;
+ private final long metadataMaxAgeMs;
private int version;
private long lastRefreshMs;
private long lastSuccessfulRefreshMs;
@@ -68,24 +69,25 @@ public final class Metadata {
* Create a metadata instance with reasonable defaults
*/
public Metadata() {
- this(100L, 60 * 60 * 1000L);
+ this(ConsumerConfig.DEFAULT_RETRY_BACKOFF_MS, ConsumerConfig.DEFAULT_METADATA_MAX_AGE_MS);
}
- public Metadata(long refreshBackoffMs, long metadataExpireMs) {
- this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
+ public Metadata(long retryBackoffMs, long metadataMaxAgeMs) {
+ this(retryBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners());
}
/**
* Create a new Metadata instance
- * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
+ * @param retryBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
* polling
- * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
+ * @param metadataMaxAgeMs The maximum amount of time that metadata can be retained without refresh
* @param topicExpiryEnabled If true, enable expiry of unused topics
* @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates.
*/
- public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
- this.refreshBackoffMs = refreshBackoffMs;
- this.metadataExpireMs = metadataExpireMs;
+
+ public Metadata(long retryBackoffMs, long metadataMaxAgeMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
+ this.retryBackoffMs = retryBackoffMs;
+ this.metadataMaxAgeMs = metadataMaxAgeMs;
this.topicExpiryEnabled = topicExpiryEnabled;
this.lastRefreshMs = 0L;
this.lastSuccessfulRefreshMs = 0L;
@@ -115,12 +117,12 @@ public synchronized void add(String topic) {
/**
* The next time to update the cluster info is the maximum of the time the current info will expire and the time the
- * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
+ * current info can be updated (i.e. backoff time has elapsed); If an update has been requested then the expiry time
* is now
*/
public synchronized long timeToNextUpdate(long nowMs) {
- long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
- long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
+ long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataMaxAgeMs - nowMs, 0);
+ long timeToAllowUpdate = this.lastRefreshMs + this.retryBackoffMs - nowMs;
return Math.max(timeToExpire, timeToAllowUpdate);
}
@@ -263,11 +265,18 @@ public synchronized long lastSuccessfulUpdate() {
return this.lastSuccessfulRefreshMs;
}
+ /**
+ * The max allowable age of metadata.
+ */
+ public long maxAgeMs() {
+ return this.metadataMaxAgeMs;
+ }
+
/**
* The metadata refresh backoff in ms
*/
public long refreshBackoff() {
- return refreshBackoffMs;
+ return retryBackoffMs;
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ae791b0d46dbf..b72a4d454e414 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -131,6 +131,7 @@ public class ConsumerConfig extends AbstractConfig {
/** metadata.max.age.ms */
public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+ public static final long DEFAULT_METADATA_MAX_AGE_MS = 5 * 60 * 1000L;
/**
* max.partition.fetch.bytes
@@ -163,6 +164,7 @@ public class ConsumerConfig extends AbstractConfig {
* retry.backoff.ms
*/
public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+ public static final long DEFAULT_RETRY_BACKOFF_MS = 100L;
/**
* metrics.sample.window.ms
@@ -236,7 +238,7 @@ public class ConsumerConfig extends AbstractConfig {
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Type.LONG,
- 5 * 60 * 1000,
+ DEFAULT_METADATA_MAX_AGE_MS,
atLeast(0),
Importance.LOW,
CommonClientConfigs.METADATA_MAX_AGE_DOC)
@@ -300,7 +302,7 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
.define(RETRY_BACKOFF_MS_CONFIG,
Type.LONG,
- 100L,
+ DEFAULT_RETRY_BACKOFF_MS,
atLeast(0L),
Importance.LOW,
CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index fa1e51352626b..50d793b2878d8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -220,20 +220,34 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C
}
/**
- * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout
- * due to metadata being unavailable
+ * Abort the batches that have been sitting in RecordAccumulator
+ * 1. longer than the requestTimeout when metadata is fresh; or
+ * 2. longer than {@link Sender#metadataStaleMs} when metadata is stale.
*/
- public List abortExpiredBatches(int requestTimeout, long now) {
+ public List abortExpiredBatches(int requestTimeout, boolean isMetadataStale, Cluster cluster, long now) {
List expiredBatches = new ArrayList<>();
int count = 0;
for (Map.Entry> entry : this.batches.entrySet()) {
Deque dq = entry.getValue();
TopicPartition tp = entry.getKey();
- // We only check if the batch should be expired if the partition does not have a batch in flight.
- // This is to prevent later batches from being expired while an earlier batch is still in progress.
- // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection
- // is only active in this case. Otherwise the expiration order is not guaranteed.
- if (!muted.contains(tp)) {
+
+ // In the case where the user wishes to achieve strict ordering, (i.e.,
+ // max.in.flight.request.per.connection=1) the muted membership condition helps ensure that batches also
+ // expire in order. Partitions are muted only if strict ordering is enabled and there are in-flight batches.
+ // This prevents later batches from being expired while an earlier batch is still in progress.
+ //
+ // Second, we expire batches when we know that we can't make progress on a given topic-partition.
+ // Specifically, we check if
+ // (1) the partition does not have an in-flight batch.
+ // (2) Either the metadata is too stale or we don't have a leader for a partition.
+ //
+ // The second condition allows expiration of ready batches if we don't have a leader for
+ // the partition.
+ //
+ // Finally, we expire batches if the last metadata refresh was too long ago. I.e., > {@link Sender#metadataStaleMs}.
+ // We might run in to this situation when the producer is disconnected from all the brokers.
+ boolean guaranteeExpirationOrder = muted.contains(tp);
+ if (!guaranteeExpirationOrder && (isMetadataStale || cluster.leaderFor(tp) == null)) {
synchronized (dq) {
// iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut
RecordBatch lastBatch = dq.peekLast();
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 30f888773c2cc..7a0b61fa7fedc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -96,6 +96,9 @@ public class Sender implements Runnable {
/* the max time to wait for the server to respond to the request*/
private final int requestTimeout;
+
+ /* the max time to wait before expiring batches. */
+ private final long metadataStaleMs;
public Sender(KafkaClient client,
Metadata metadata,
@@ -120,6 +123,8 @@ public Sender(KafkaClient client,
this.clientId = clientId;
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
+
+ this.metadataStaleMs = getMetadataStaleMs(metadata.maxAgeMs(), requestTimeout, metadata.refreshBackoff(), retries);
}
/**
@@ -207,12 +212,14 @@ void run(long now) {
this.accumulator.mutePartition(batch.topicPartition);
}
}
-
- List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
- // update sensors
- for (RecordBatch expiredBatch : expiredBatches)
- this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
-
+ boolean stale = isMetadataStale(now, metadata, this.metadataStaleMs);
+ if (stale || !result.unknownLeaderTopics.isEmpty()) {
+ List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, stale, cluster, now);
+ // update sensors
+ for (RecordBatch expiredBatch : expiredBatches)
+ this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
+ }
+
sensors.updateProduceRequestMetrics(batches);
List requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
@@ -375,6 +382,19 @@ public void wakeup() {
this.client.wakeup();
}
+ /* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds
+ * the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to account
+ * for retries and transient network disconnections. There must be enough time for at least one retry.
+ * As retries are subject to both regular request timeout and the backoff, staleness determination is delayed
+ * by that factor.
+ */
+ static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) {
+ return metadataMaxAgeMs + (retries + 1) * (requestTimeoutMs + refreshBackoffMs);
+ }
+
+ static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) {
+ return (now - metadata.lastSuccessfulUpdate()) > metadataStaleMs;
+ }
/**
* A collection of sensors for the sender
*/
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 333a072065bb4..bfa934f5ad7fa 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -40,8 +40,8 @@
public class MetadataTest {
private long refreshBackoffMs = 100;
- private long metadataExpireMs = 1000;
- private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs);
+ private long metadataMaxAgeMs = 1000;
+ private Metadata metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs);
private AtomicReference backgroundError = new AtomicReference();
@After
@@ -75,7 +75,7 @@ public void testMetadata() throws Exception {
t1.join();
t2.join();
assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
- time += metadataExpireMs;
+ time += metadataMaxAgeMs;
assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0);
}
@@ -153,7 +153,7 @@ public void testClusterListenerGetsNotifiedOfUpdate() {
MockClusterResourceListener mockClusterListener = new MockClusterResourceListener();
ClusterResourceListeners listeners = new ClusterResourceListeners();
listeners.maybeAdd(mockClusterListener);
- metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners);
+ metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, listeners);
String hostName = "www.example.com";
Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002)));
@@ -246,7 +246,7 @@ public void onMetadataUpdate(Cluster cluster) {
@Test
public void testTopicExpiry() throws Exception {
- metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners());
+ metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, true, new ClusterResourceListeners());
// Test that topic is expired if not used within the expiry interval
long time = 0;
@@ -278,7 +278,7 @@ public void testTopicExpiry() throws Exception {
@Test
public void testNonExpiringMetadata() throws Exception {
- metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners());
+ metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners());
// Test that topic is not expired if not used within the expiry interval
long time = 0;
@@ -302,7 +302,7 @@ public void testNonExpiringMetadata() throws Exception {
HashSet topics = new HashSet<>();
topics.add("topic4");
metadata.setTopics(topics);
- time += metadataExpireMs * 2;
+ time += metadataMaxAgeMs * 2;
metadata.update(Cluster.empty(), time);
assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4"));
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 216f07eb8f644..21a2ec16c835f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -30,11 +30,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LogEntry;
@@ -344,72 +346,124 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
@Test
public void testExpiredBatches() throws InterruptedException {
long retryBackoffMs = 100L;
- long lingerMs = 3000L;
- int requestTimeout = 60;
-
- RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
- int appends = 1024 / msgSize;
+ long lingerMs = 500L;
+ int requestTimeoutMs = 2000;
+ int batchSize = 1024;
+ long totalSize = 10 * 1024;
+ long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min
+ int retries = 1;
+ long staleMetadataAgeMs = Sender.getMetadataStaleMs(metadataMaxAgeMs, requestTimeoutMs, retryBackoffMs, retries);
+ List expiredBatches;
+ RecordAccumulator.ReadyCheckResult result;
+ Set readyNodes;
+ Map> drained;
+ boolean isMetadataStale = false;
+
+ assertTrue("Stale metadata age must be more than request timeout", staleMetadataAgeMs > metadataMaxAgeMs);
+
+ Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners());
+ metadata.update(cluster, time.milliseconds());
+ RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
+ int appends = batchSize / msgSize;
// Test batches not in retry
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
}
- // Make the batches ready due to batch full
+ // subtest1: Make the batches ready due to batch full
accum.append(tp1, 0L, key, value, null, 0);
- Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
- // Advance the clock to expire the batch.
- time.sleep(requestTimeout + 1);
- accum.mutePartition(tp1);
- List expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes);
- accum.unmutePartition(tp1);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should be expired", 1, expiredBatches.size());
- assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertTrue("The batch should not expire because leaders are known and metadata is fresh.",
+ result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0);
+ // subtest1 done
- // Advance the clock to make the next batch ready due to linger.ms
- time.sleep(lingerMs);
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
- time.sleep(requestTimeout + 1);
+ // subtest2: test effects of mute-unmute on ready and expired batches
+ accum.mutePartition(tp1); // a batch is now in-flight.
- accum.mutePartition(tp1);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size());
+ result = accum.ready(cluster, time.milliseconds());
+ assertTrue("No partition should be ready because it's muted", result.readyNodes.isEmpty());
+
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertTrue("The batch should not expire because the leaders are known and metadata is fresh.",
+ result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0);
accum.unmutePartition(tp1);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size());
- assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
+ // subtest2 done
- // Test batches in retry.
- // Create a retried batch
- accum.append(tp1, 0L, key, value, null, 0);
- time.sleep(lingerMs);
- readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
- assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
- Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
- assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1);
- time.sleep(1000L);
- accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
+ // subtest3: test effect of linger on ready and expired batches
+ time.sleep(lingerMs + 1); // Advance the clock beyond linger.ms.
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes);
+ // subtest3 done
+
+ // subtest4: test the effect of request timeout.
+ time.sleep(requestTimeoutMs);
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertTrue("The batch should not expire because the leaders are known and metadata is fresh.",
+ result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0);
+ // subtest4 done
+
+ // drain first batch.
+ drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertEquals("There should be one batch.", drained.get(node1.id()).size(), 1);
- // test expiration.
- time.sleep(requestTimeout + retryBackoffMs);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should not be expired.", 0, expiredBatches.size());
- time.sleep(1L);
+ // drain the second batch.
+ drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
+ assertEquals("Drain again. There should be one batch.", drained.get(node1.id()).size(), 1);
- accum.mutePartition(tp1);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());
+ // subtest5: Test batches in retry. Create a retried batch
+ accum.append(tp1, 0L, key, value, null, 0);
+ time.sleep(lingerMs + 1);
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes);
+ time.sleep(requestTimeoutMs);
+ accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertTrue("The batch should not expire because the leaders are known and metadata is fresh.",
+ result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0);
+ // subtest5 done.
+
+ // subtest6: Test meatadata expiry
+ time.sleep(staleMetadataAgeMs);
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertEquals("The batch should expire because metadata is no longer fresh.", 2, expiredBatches.size());
+ // subtest6 done
+
+ // subtest7: Test batches with missing leader.
+ int partition4 = 3;
+ TopicPartition tp4 = new TopicPartition(topic, partition4);
+ PartitionInfo part4 = new PartitionInfo(topic, partition4, null, null, null);
+ Map extra = Collections.singletonMap(tp4, part4);
+ cluster = cluster.withPartitions(extra);
+
+ // update metadata
+ metadata.update(cluster, time.milliseconds());
+
+ accum.append(tp4, 0L, key, value, null, 0);
+ time.sleep(lingerMs + 1);
- accum.unmutePartition(tp1);
- expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds());
- assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());
+ result = accum.ready(cluster, time.milliseconds());
+ assertEquals("Node should not be ready", 0, result.readyNodes.size());
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size());
+
+ time.sleep(requestTimeoutMs + 1);
+ isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs);
+ expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds());
+ assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size());
+ // subtest7 done
}
-
+
@Test
public void testMutedPartitions() throws InterruptedException {
long now = time.milliseconds();
@@ -441,4 +495,5 @@ public void testMutedPartitions() throws InterruptedException {
drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
}
+
}