diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index beaa832d3889e..915c4d3e9d930 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -13,6 +13,7 @@ package org.apache.kafka.clients.producer.internals; import java.util.Iterator; + import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; @@ -217,19 +218,27 @@ public List abortExpiredBatches(int requestTimeout, Cluster cluster int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); - synchronized (dq) { - // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut - Iterator batchIterator = dq.iterator(); - while (batchIterator.hasNext()) { - RecordBatch batch = batchIterator.next(); - // check if the batch is expired - if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) { - expiredBatches.add(batch); - count++; - batchIterator.remove(); - deallocate(batch); - } else { - if (!batch.inRetry()) { + // We only check if the batch should be expired if the partition does not have a batch in flight. + // This is to avoid the later batches get expired when an earlier batch is still in progress. + // This protection only takes effect when user sets max.in.flight.request.per.connection=1. + // Otherwise the expiration order is not guranteed. + TopicPartition tp = entry.getKey(); + if (!muted.contains(tp)) { + synchronized (dq) { + // iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut + RecordBatch lastBatch = dq.peekLast(); + Iterator batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + boolean isFull = batch != lastBatch || batch.records.isFull(); + // check if the batch is expired + if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { + expiredBatches.add(batch); + count++; + batchIterator.remove(); + deallocate(batch); + } else { + // Stop at the first batch that has not expired. break; } } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java index 7b5fbbe0736dd..1e532cb7b5532 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java @@ -134,14 +134,23 @@ public String toString() { } /** - * Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable. - * We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready - * if the leader is unavailable. + * A batch whose metadata is not available should be expired if one of the following is true: + *
    + *
  1. the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached). + *
  2. the batch is in retry AND request timeout has elapsed after the backoff period ended. + *
*/ - public boolean maybeExpire(int requestTimeout, long now, long lingerMs) { + public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { boolean expire = false; - if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) { + + if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) + expire = true; + else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs))) expire = true; + else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs))) + expire = true; + + if (expire) { this.records.close(); this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch Expired")); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 366027286181d..904aa73bd33cd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -297,22 +297,71 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { @Test public void testExpiredBatches() throws InterruptedException { - long now = time.milliseconds(); - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time); + long retryBackoffMs = 100L; + long lingerMs = 3000L; + int requestTimeout = 60; + + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = 1024 / msgSize; + + // Test batches not in retry for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); } - time.sleep(2000); - accum.ready(cluster, now); + // Make the batches ready due to batch full accum.append(tp1, 0L, key, value, null, 0); Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Cluster cluster = new Cluster(new ArrayList(), new ArrayList(), Collections.emptySet()); - now = time.milliseconds(); - List expiredBatches = accum.abortExpiredBatches(60, cluster, now); - assertEquals(1, expiredBatches.size()); + // Advance the clock to expire the batch. + time.sleep(requestTimeout + 1); + accum.mutePartition(tp1); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + // Advance the clock to make the next batch ready due to linger.ms + time.sleep(lingerMs); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + time.sleep(requestTimeout + 1); + + accum.mutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + + // Test batches in retry. + // Create a retried batch + accum.append(tp1, 0L, key, value, null, 0); + time.sleep(lingerMs); + readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); + Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1); + time.sleep(1000L); + accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + + // test expiration. + time.sleep(requestTimeout + retryBackoffMs); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired.", 0, expiredBatches.size()); + time.sleep(1L); + + accum.mutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); + + accum.unmutePartition(tp1); + expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds()); + assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); } @Test