Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.apache.kafka.clients.producer.internals;

import java.util.Iterator;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -217,19 +218,27 @@ public List<RecordBatch> abortExpiredBatches(int requestTimeout, Cluster cluster
int count = 0;
for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) {
Deque<RecordBatch> dq = entry.getValue();
synchronized (dq) {
// iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, now, this.lingerMs)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
} else {
if (!batch.inRetry()) {
// We only check if the batch should be expired if the partition does not have a batch in flight.
// This is to avoid the later batches get expired when an earlier batch is still in progress.
// This protection only takes effect when user sets max.in.flight.request.per.connection=1.
// Otherwise the expiration order is not guranteed.
TopicPartition tp = entry.getKey();
if (!muted.contains(tp)) {
synchronized (dq) {
// iterate over the batches and expire them if they have stayed in accumulator for more than requestTimeOut
RecordBatch lastBatch = dq.peekLast();
Iterator<RecordBatch> batchIterator = dq.iterator();
while (batchIterator.hasNext()) {
RecordBatch batch = batchIterator.next();
boolean isFull = batch != lastBatch || batch.records.isFull();
// check if the batch is expired
if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) {
expiredBatches.add(batch);
count++;
batchIterator.remove();
deallocate(batch);
} else {
// Stop at the first batch that has not expired.
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,23 @@ public String toString() {
}

/**
* Expire the batch that is ready but is sitting in accumulator for more than requestTimeout due to metadata being unavailable.
* We need to explicitly check if the record is full or linger time is met because the accumulator's partition may not be ready
* if the leader is unavailable.
* A batch whose metadata is not available should be expired if one of the following is true:
* <ol>
* <li> the batch is not in retry AND request timeout has elapsed after it is ready (full or linger.ms has reached).
* <li> the batch is in retry AND request timeout has elapsed after the backoff period ended.
* </ol>
*/
public boolean maybeExpire(int requestTimeout, long now, long lingerMs) {
public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) {
boolean expire = false;
if ((this.records.isFull() && requestTimeout < (now - this.lastAppendTime)) || requestTimeout < (now - (this.lastAttemptMs + lingerMs))) {

if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime))
expire = true;
else if (!this.inRetry() && requestTimeoutMs < (now - (this.createdMs + lingerMs)))
expire = true;
else if (this.inRetry() && requestTimeoutMs < (now - (this.lastAttemptMs + retryBackoffMs)))
expire = true;

if (expire) {
this.records.close();
this.done(-1L, Record.NO_TIMESTAMP, new TimeoutException("Batch Expired"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,71 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {

@Test
public void testExpiredBatches() throws InterruptedException {
long now = time.milliseconds();
RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, 10, 100L, metrics, time);
long retryBackoffMs = 100L;
long lingerMs = 3000L;
int requestTimeout = 60;

RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time);
int appends = 1024 / msgSize;

// Test batches not in retry
for (int i = 0; i < appends; i++) {
accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
}
time.sleep(2000);
accum.ready(cluster, now);
// Make the batches ready due to batch full
accum.append(tp1, 0L, key, value, null, 0);
Set<Node> readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>(), Collections.<String>emptySet());
now = time.milliseconds();
List<RecordBatch> expiredBatches = accum.abortExpiredBatches(60, cluster, now);
assertEquals(1, expiredBatches.size());
// Advance the clock to expire the batch.
time.sleep(requestTimeout + 1);
accum.mutePartition(tp1);
List<RecordBatch> expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());

accum.unmutePartition(tp1);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should be expired", 1, expiredBatches.size());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());

// Advance the clock to make the next batch ready due to linger.ms
time.sleep(lingerMs);
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
time.sleep(requestTimeout + 1);

accum.mutePartition(tp1);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size());

accum.unmutePartition(tp1);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size());
assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());

// Test batches in retry.
// Create a retried batch
accum.append(tp1, 0L, key, value, null, 0);
time.sleep(lingerMs);
readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes;
assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes);
Map<Integer, List<RecordBatch>> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds());
assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1);
time.sleep(1000L);
accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds());

// test expiration.
time.sleep(requestTimeout + retryBackoffMs);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should not be expired.", 0, expiredBatches.size());
time.sleep(1L);

accum.mutePartition(tp1);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size());

accum.unmutePartition(tp1);
expiredBatches = accum.abortExpiredBatches(requestTimeout, cluster, time.milliseconds());
assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size());
}

@Test
Expand Down