From 7561b81254d4361122c3467b741f98f37a769b29 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Sun, 13 Mar 2016 16:57:52 -0700 Subject: [PATCH 1/9] KAFKA-3388: Fix expiration of batches sitting in the accumulator --- .../producer/internals/RecordAccumulator.java | 36 +++++++----- .../producer/internals/RecordBatch.java | 21 +++++-- .../internals/RecordAccumulatorTest.java | 56 ++++++++++++++++--- 3 files changed, 85 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index beaa832d3889e..cccc71c2373ea 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -217,20 +217,28 @@ public List abortExpiredBatches(int requestTimeout, Cluster cluster int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); - synchronized (dq) { - // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut - Iterator batchIterator = dq.iterator(); - while (batchIterator.hasNext()) { - RecordBatch batch = batchIterator.next(); - // check if the batch is expired - if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { - expiredBatches.add(batch); - count++; - batchIterator.remove(); - deallocate(batch); - } else { - if (!batch.inRetry()) { - break; + // Only expire a batch if its metadata is not available. + if (cluster.leaderFor(entry.getKey()) == null) { + synchronized (dq) { + // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut + RecordBatch lastBatch = dq.peekLast(); + Iterator batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + // We have to close the previous batches in the deque before expiration check, otherwise those + // batches may never be considered as full. + if (batch != lastBatch) + batch.records.close(); + // check if the batch is expired + if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs)) { + expiredBatches.add(batch); + count++; + batchIterator.remove(); + deallocate(batch); + } else { + if (!batch.inRetry()) { + break; + } } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7b5fbbe0736dd..d68e517e98995 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -134,14 +134,25 @@ public String toString() { } /** - * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable. - * We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready - * if the leader is unavailable. + * A batch whose metadata is not available should be expired if one of the following is true: + *
    + *
  1. the batch is not in retry AND request timeout has eplapsed after it is ready. (We need to see if a batch is + * ready by explicitly checking if the record is full or linger time is met because the accumulator's partition + * may not be ready if the leader is unavailable.) + *
  2. the batch is in retry AND request timeout has elapsed after the backoff period ended. + *
*/ - public boolean maybeExpire(int requestTimeout, long now, long lingerMs) { + public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs) { boolean expire = false; - if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) { + + if (!this.inRetry() && this.records.isFull() && requestTimeoutMs < (now - this.lastAppendTime)) + expire = true; + else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) expire = true; + else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) + expire = true; + + if (expire) { this.records.close(); this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch Expired")); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 366027286181d..f4d8913d08180 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -297,22 +297,60 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { @Test public void testExpiredBatches() throws InterruptedException { - long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time); + long retryBackoffMs = 100L; + long lingerMs = 3000L; + int requestTimeout = 60; + + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; + + // Test batches not in retry for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); } - time.sleep(2000); - accum.ready(cluster, now); + // Make the batches ready due to batch full accum.append(tp1, 0L, key, value, null, 0); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Cluster cluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - now = time.milliseconds(); - List expiredBatches = accum.abortExpiredBatches(60, cluster, now); - assertEquals(1, expiredBatches.size()); + // Advance the clock to expire the batch. + time.sleep(2000L); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); + Cluster emptyCluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); + expiredBatches = accum.abortExpiredBatches(60, emptyCluster, time.milliseconds()); + assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + // Advance the clock to make batch ready due to linger.ms + time.sleep(2000L); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + // Advance the clock to make the batch expire + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + // Test batches in retry. + // Create a retried batch + accum.append(tp1, 0L, key, value, null, 0); + time.sleep(lingerMs); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1); + time.sleep(1000L); + accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + // test expiration. + time.sleep(requestTimeout + retryBackoffMs); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + assertEquals("The batch should not be expired.", 0, expiredBatches.size()); + time.sleep(1L); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + assertEquals("The batch should be expired.", 1, expiredBatches.size()); + } @Test From 6ff4abb0681e1cde90343d09132ec30f442815c8 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 14 Mar 2016 11:45:57 -0700 Subject: [PATCH 2/9] Addressed Ismael and Mayuresh's comments. --- .../org/apache/kafka/clients/Metadata.java | 4 +++ .../producer/internals/RecordAccumulator.java | 12 +++++++-- .../producer/internals/RecordBatch.java | 2 +- .../clients/producer/internals/Sender.java | 2 +- .../internals/RecordAccumulatorTest.java | 25 +++++++++++-------- 5 files changed, 31 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 73a9f333cc7a4..9af667c7d4104 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -179,6 +179,10 @@ public synchronized void update(Cluster cluster, long now) { public synchronized void failedUpdate(long now) { this.lastRefreshMs = now; } + + public synchronized long lastRefresh() { + return this.lastRefreshMs; + } /** * @return The current metadata version diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index cccc71c2373ea..3f7e675a5989e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,6 +13,8 @@ package org.apache.kafka.clients.producer.internals; import java.util.Iterator; + +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; @@ -212,13 +214,19 @@ public RecordAppendResult append(TopicPartition tp, * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { + public List abortExpiredBatches(int requestTimeout, Metadata metadata, long now) { List expiredBatches = new ArrayList(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); // Only expire a batch if its metadata is not available. - if (cluster.leaderFor(entry.getKey()) == null) { + Cluster cluster = metadata.fetch(); + // Check if the metadata might be old. + boolean maybeStaleMetadata = metadata.lastRefresh() > metadata.lastSuccessfulUpdate(); + // We will check all the batches if metadata might be old, this is to avoid the case that no batches will + // be timeout out when all the brokers are done and producer keeps the stale metadata without timing out + // the batches. + if (maybeStaleMetadata || cluster.leaderFor(entry.getKey()) == null) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index d68e517e98995..fae41f69e3f03 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -136,7 +136,7 @@ public String toString() { /** * A batch whose metadata is not available should be expired if one of the following is true: *
    - *
  1. the batch is not in retry AND request timeout has eplapsed after it is ready. (We need to see if a batch is + *
  2. the batch is not in retry AND request timeout has elapsed after it is ready. (We need to see if a batch is * ready by explicitly checking if the record is full or linger time is met because the accumulator's partition * may not be ready if the leader is unavailable.) *
  3. the batch is in retry AND request timeout has elapsed after the backoff period ended. diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index db8918c2a49dd..084773e821a98 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -202,7 +202,7 @@ void run(long now) { } } - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, metadata, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index f4d8913d08180..b566f94f193b4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -27,6 +27,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -300,6 +301,8 @@ public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; long lingerMs = 3000L; int requestTimeout = 60; + Metadata metadata = new Metadata(); + metadata.update(cluster, time.milliseconds()); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; @@ -314,20 +317,22 @@ public void testExpiredBatches() throws InterruptedException { Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); // Advance the clock to expire the batch. - time.sleep(2000L); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + time.sleep(requestTimeout + 1); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); Cluster emptyCluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - expiredBatches = accum.abortExpiredBatches(60, emptyCluster, time.milliseconds()); + Metadata emptyMetadata = new Metadata(); + emptyMetadata.update(emptyCluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(60, emptyMetadata, time.milliseconds()); assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Advance the clock to make batch ready due to linger.ms - time.sleep(2000L); + time.sleep(lingerMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - // Advance the clock to make the batch expire - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + time.sleep(requestTimeout + 1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -343,12 +348,12 @@ public void testExpiredBatches() throws InterruptedException { accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); assertEquals("The batch should be expired.", 1, expiredBatches.size()); } From 3115380a4d43590445ca947538113003a405f137 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 21 Mar 2016 13:26:54 -0700 Subject: [PATCH 3/9] Addressed Jun's comments. --- .../org/apache/kafka/clients/Metadata.java | 4 ---- .../producer/internals/RecordAccumulator.java | 20 ++++++++--------- .../clients/producer/internals/Sender.java | 5 ++++- .../internals/RecordAccumulatorTest.java | 22 +++++++++---------- 4 files changed, 24 insertions(+), 27 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 9af667c7d4104..73a9f333cc7a4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -179,10 +179,6 @@ public synchronized void update(Cluster cluster, long now) { public synchronized void failedUpdate(long now) { this.lastRefreshMs = now; } - - public synchronized long lastRefresh() { - return this.lastRefreshMs; - } /** * @return The current metadata version diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 3f7e675a5989e..99fb378f9294a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -14,7 +14,7 @@ import java.util.Iterator; -import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; @@ -214,19 +214,19 @@ public RecordAppendResult append(TopicPartition tp, * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, Metadata metadata, long now) { + public List abortExpiredBatches(int requestTimeout, + Cluster cluster, + KafkaClient client, + long now) { List expiredBatches = new ArrayList(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); - // Only expire a batch if its metadata is not available. - Cluster cluster = metadata.fetch(); - // Check if the metadata might be old. - boolean maybeStaleMetadata = metadata.lastRefresh() > metadata.lastSuccessfulUpdate(); - // We will check all the batches if metadata might be old, this is to avoid the case that no batches will - // be timeout out when all the brokers are done and producer keeps the stale metadata without timing out - // the batches. - if (maybeStaleMetadata || cluster.leaderFor(entry.getKey()) == null) { + // We will check if the batch should be expried if one of the following is true: + // 1. The leader is unknown. + // 2. The leader broker is disconnected. + Node leader = cluster.leaderFor(entry.getKey()); + if (leader == null || client.connectionFailed(leader)) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 084773e821a98..95c62833df7e0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -202,7 +202,10 @@ void run(long now) { } } - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, metadata, now); + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, + cluster, + client, + now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b566f94f193b4..7c9e5ce835998 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -27,7 +27,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.KafkaClient; +import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -301,8 +302,7 @@ public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; long lingerMs = 3000L; int requestTimeout = 60; - Metadata metadata = new Metadata(); - metadata.update(cluster, time.milliseconds()); + KafkaClient client = new MockClient(time); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; @@ -318,21 +318,19 @@ public void testExpiredBatches() throws InterruptedException { assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); Cluster emptyCluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - Metadata emptyMetadata = new Metadata(); - emptyMetadata.update(emptyCluster, time.milliseconds()); - expiredBatches = accum.abortExpiredBatches(60, emptyMetadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(60, emptyCluster, client, time.milliseconds()); assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Advance the clock to make batch ready due to linger.ms time.sleep(lingerMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); time.sleep(requestTimeout + 1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -348,12 +346,12 @@ public void testExpiredBatches() throws InterruptedException { accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyMetadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); assertEquals("The batch should be expired.", 1, expiredBatches.size()); } From 11ee80334bec6036e361168190e2d1b6b51488a7 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 21 Mar 2016 14:00:11 -0700 Subject: [PATCH 4/9] Addressed Ismael's comments. --- .../kafka/clients/producer/internals/RecordAccumulator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 99fb378f9294a..32074f9f939a5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -222,7 +222,7 @@ public List abortExpiredBatches(int requestTimeout, int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); - // We will check if the batch should be expried if one of the following is true: + // We will check if the batch should be expired if one of the following is true: // 1. The leader is unknown. // 2. The leader broker is disconnected. Node leader = cluster.leaderFor(entry.getKey()); From f27fd8a448960864d5dc55bb77ddd87c7fee86da Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 21 Mar 2016 17:31:30 -0700 Subject: [PATCH 5/9] Addressed Jun's comments. --- .../src/main/java/org/apache/kafka/clients/NetworkClient.java | 3 ++- .../kafka/clients/producer/internals/RecordAccumulator.java | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 4d01cdeb2e27d..ebdbf64576040 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -199,7 +199,8 @@ public long connectionDelay(Node node, long now) { */ @Override public boolean connectionFailed(Node node) { - return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); + return connectionStates.isConnected(node.idString()) && + connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 32074f9f939a5..908162065f76b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -244,9 +244,7 @@ public List abortExpiredBatches(int requestTimeout, batchIterator.remove(); deallocate(batch); } else { - if (!batch.inRetry()) { - break; - } + break; } } } From 2ac0cf62a0ed8d2c507e3e2a618a133d20d7e509 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Mon, 21 Mar 2016 17:43:15 -0700 Subject: [PATCH 6/9] Addressed Jun's comments. --- .../java/org/apache/kafka/clients/NetworkClient.java | 3 +-- .../clients/producer/internals/RecordAccumulator.java | 9 ++++++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index ebdbf64576040..4d01cdeb2e27d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -199,8 +199,7 @@ public long connectionDelay(Node node, long now) { */ @Override public boolean connectionFailed(Node node) { - return connectionStates.isConnected(node.idString()) && - connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); + return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 908162065f76b..8da9518caf19b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -226,7 +226,14 @@ public List abortExpiredBatches(int requestTimeout, // 1. The leader is unknown. // 2. The leader broker is disconnected. Node leader = cluster.leaderFor(entry.getKey()); - if (leader == null || client.connectionFailed(leader)) { + boolean leaderNotConnected = true; + try { + leaderNotConnected = client.connectionFailed(leader); + } catch (IllegalArgumentException e) { + // This means the client does not know the leader node. So it is not connected. + } + + if (leader == null || leaderNotConnected) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); From a218b85c3a6a5426424a4f3a3f126968bc5d480b Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Tue, 22 Mar 2016 11:35:56 -0700 Subject: [PATCH 7/9] Fix a potential NPE. --- .../producer/internals/RecordAccumulator.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 8da9518caf19b..15dac169661fc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -227,13 +227,15 @@ public List abortExpiredBatches(int requestTimeout, // 2. The leader broker is disconnected. Node leader = cluster.leaderFor(entry.getKey()); boolean leaderNotConnected = true; - try { - leaderNotConnected = client.connectionFailed(leader); - } catch (IllegalArgumentException e) { - // This means the client does not know the leader node. So it is not connected. + if (leader != null) { + try { + leaderNotConnected = client.connectionFailed(leader); + } catch (IllegalStateException e) { + // This means the client does not know the leader node. So it is not connected. + } } - if (leader == null || leaderNotConnected) { + if (leader == null) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); From 8a537d46a39e8994b66c87010cb1785cc50a32c3 Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Thu, 24 Mar 2016 14:46:53 -0700 Subject: [PATCH 8/9] Addressed Jun's comments. --- .../producer/internals/RecordAccumulator.java | 21 +++--------- .../clients/producer/internals/Sender.java | 5 +-- .../internals/RecordAccumulatorTest.java | 32 +++++++++---------- 3 files changed, 22 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 15dac169661fc..bbf4147024db3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -14,7 +14,6 @@ import java.util.Iterator; -import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; @@ -214,28 +213,18 @@ public RecordAppendResult append(TopicPartition tp, * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, - Cluster cluster, - KafkaClient client, - long now) { + public List abortExpiredBatches(int requestTimeout, Cluster cluster, long now) { List expiredBatches = new ArrayList(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); // We will check if the batch should be expired if one of the following is true: // 1. The leader is unknown. - // 2. The leader broker is disconnected. - Node leader = cluster.leaderFor(entry.getKey()); - boolean leaderNotConnected = true; - if (leader != null) { - try { - leaderNotConnected = client.connectionFailed(leader); - } catch (IllegalStateException e) { - // This means the client does not know the leader node. So it is not connected. - } - } + // 2. The partition does not have a batch in flight. + TopicPartition tp = entry.getKey(); + Node leader = cluster.leaderFor(tp); - if (leader == null) { + if (leader == null || !muted.contains(tp)) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 95c62833df7e0..db8918c2a49dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -202,10 +202,7 @@ void run(long now) { } } - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, - cluster, - client, - now); + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 7c9e5ce835998..8205f907d53a8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -27,8 +27,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.kafka.clients.KafkaClient; -import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -302,7 +300,6 @@ public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; long lingerMs = 3000L; int requestTimeout = 60; - KafkaClient client = new MockClient(time); RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; @@ -318,20 +315,22 @@ public void testExpiredBatches() throws InterruptedException { assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); + accum.mutePartition(tp1); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); Cluster emptyCluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - expiredBatches = accum.abortExpiredBatches(60, emptyCluster, client, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(60, emptyCluster, time.milliseconds()); assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Advance the clock to make batch ready due to linger.ms time.sleep(lingerMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); time.sleep(requestTimeout + 1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); - assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); // Test batches in retry. @@ -346,14 +345,15 @@ public void testExpiredBatches() throws InterruptedException { accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); - expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, client, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available", 0, expiredBatches.size()); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, client, time.milliseconds()); - assertEquals("The batch should be expired.", 1, expiredBatches.size()); - + accum.mutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); } @Test From ba5f8b06c2573544aecc531d167b7e37ae195f8b Mon Sep 17 00:00:00 2001 From: Jiangjie Qin Date: Fri, 25 Mar 2016 11:39:23 -0700 Subject: [PATCH 9/9] Addressed Jun's comments. --- .../producer/internals/RecordAccumulator.java | 19 +++++++--------- .../producer/internals/RecordBatch.java | 8 +++---- .../internals/RecordAccumulatorTest.java | 22 +++++++++++++------ 3 files changed, 26 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index bbf4147024db3..915c4d3e9d930 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -218,30 +218,27 @@ public List abortExpiredBatches(int requestTimeout, Cluster cluster int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); - // We will check if the batch should be expired if one of the following is true: - // 1. The leader is unknown. - // 2. The partition does not have a batch in flight. + // We only check if the batch should be expired if the partition does not have a batch in flight. + // This is to avoid the later batches get expired when an earlier batch is still in progress. + // This protection only takes effect when user sets max.in.flight.request.per.connection=1. + // Otherwise the expiration order is not guranteed. TopicPartition tp = entry.getKey(); - Node leader = cluster.leaderFor(tp); - - if (leader == null || !muted.contains(tp)) { + if (!muted.contains(tp)) { synchronized (dq) { // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); Iterator batchIterator = dq.iterator(); while (batchIterator.hasNext()) { RecordBatch batch = batchIterator.next(); - // We have to close the previous batches in the deque before expiration check, otherwise those - // batches may never be considered as full. - if (batch != lastBatch) - batch.records.close(); + boolean isFull = batch != lastBatch || batch.records.isFull(); // check if the batch is expired - if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs)) { + if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { expiredBatches.add(batch); count++; batchIterator.remove(); deallocate(batch); } else { + // Stop at the first batch that has not expired. break; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index fae41f69e3f03..1e532cb7b5532 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -136,16 +136,14 @@ public String toString() { /** * A batch whose metadata is not available should be expired if one of the following is true: *
      - *
    1. the batch is not in retry AND request timeout has elapsed after it is ready. (We need to see if a batch is - * ready by explicitly checking if the record is full or linger time is met because the accumulator's partition - * may not be ready if the leader is unavailable.) + *
    2. the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached). *
    3. the batch is in retry AND request timeout has elapsed after the backoff period ended. *
    */ - public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs) { + public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { boolean expire = false; - if (!this.inRetry() && this.records.isFull() && requestTimeoutMs < (now - this.lastAppendTime)) + if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) expire = true; else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) expire = true; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 8205f907d53a8..904aa73bd33cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -317,17 +317,22 @@ public void testExpiredBatches() throws InterruptedException { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); - Cluster emptyCluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - expiredBatches = accum.abortExpiredBatches(60, emptyCluster, time.milliseconds()); - assertEquals("The batch should be expired when metadata is missing", 1, expiredBatches.size()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); - // Advance the clock to make batch ready due to linger.ms + + // Advance the clock to make the next batch ready due to linger.ms time.sleep(lingerMs); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); time.sleep(requestTimeout + 1); + + accum.mutePartition(tp1); expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + accum.unmutePartition(tp1); expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); @@ -343,14 +348,17 @@ public void testExpiredBatches() throws InterruptedException { assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1); time.sleep(1000L); accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, emptyCluster, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); + accum.mutePartition(tp1); expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + accum.unmutePartition(tp1); expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());