From 33b85807b230f0392727f130f28ba2550de7a413 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 25 Aug 2016 18:24:06 -0700 Subject: [PATCH 01/19] KAFKA-4089: fixing batch expired when stale metadata --- .../main/java/org/apache/kafka/clients/Metadata.java | 9 ++++++++- .../kafka/clients/producer/internals/Sender.java | 12 +++++++----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index f717001f4deaa..42c7cfda28d42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -115,7 +115,7 @@ public synchronized void add(String topic) { /** * The next time to update the cluster info is the maximum of the time the current info will expire and the time the - * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time + * current info can be updated (i.e. backoff time has elapsed); If an update has been requested then the expiry time * is now */ public synchronized long timeToNextUpdate(long nowMs) { @@ -124,6 +124,13 @@ public synchronized long timeToNextUpdate(long nowMs) { return Math.max(timeToExpire, timeToAllowUpdate); } + /** + * The metadata has expired if an update is explicitly requested or an update is due now. + */ + public synchronized boolean hasExpired(long nowMs) { + return this.needUpdate || this.timeToNextUpdate(nowMs) == 0; + } + /** * Request an update of the current cluster metadata info, return the current version before the update */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 30f888773c2cc..47624521ba6d7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -208,11 +208,13 @@ void run(long now) { } } - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); - // update sensors - for (RecordBatch expiredBatch : expiredBatches) - this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); - + if (!metadata.hasExpired(now)) { + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); + // update sensors + for (RecordBatch expiredBatch : expiredBatches) + this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); + } + sensors.updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately From 4ccff43bdbc5601b54f6b1a7ee45623741201358 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Fri, 26 Aug 2016 17:00:30 -0700 Subject: [PATCH 02/19] updated batch expiry conditions --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 8 +++++--- .../apache/kafka/clients/producer/internals/Sender.java | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 42c7cfda28d42..060332f2a0af9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -52,6 +52,7 @@ public final class Metadata { private final long refreshBackoffMs; private final long metadataExpireMs; + private final long metadataStaleMs; private int version; private long lastRefreshMs; private long lastSuccessfulRefreshMs; @@ -86,6 +87,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; + this.metadataStaleMs = metadataExpireMs * 2; this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; @@ -125,10 +127,10 @@ public synchronized long timeToNextUpdate(long nowMs) { } /** - * The metadata has expired if an update is explicitly requested or an update is due now. + * The metadata is stale if it has taken more than staleMs after last successful refresh. */ - public synchronized boolean hasExpired(long nowMs) { - return this.needUpdate || this.timeToNextUpdate(nowMs) == 0; + public synchronized boolean isStale(long nowMs) { + return (nowMs - this.lastSuccessfulRefreshMs) > this.metadataStaleMs; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 47624521ba6d7..86db44484f396 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -208,7 +208,7 @@ void run(long now) { } } - if (!metadata.hasExpired(now)) { + if (metadata.isStale(now) || (metadata.timeToNextUpdate(now) != 0 && !result.unknownLeaderTopics.isEmpty())) { List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) From 2589bb3cb475dcffaf0f67e075eca23859f442df Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Mon, 29 Aug 2016 09:58:45 -0700 Subject: [PATCH 03/19] updated conditions for batch expiry --- .../kafka/clients/producer/internals/RecordAccumulator.java | 6 ++++-- .../org/apache/kafka/clients/producer/internals/Sender.java | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index fa1e51352626b..3228c310089f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -15,6 +15,7 @@ import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; @@ -223,8 +224,9 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, long now) { + public List abortExpiredBatches(int requestTimeout, Metadata metadata, long now) { List expiredBatches = new ArrayList<>(); + Cluster cluster = metadata.fetch(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); @@ -233,7 +235,7 @@ public List abortExpiredBatches(int requestTimeout, long now) { // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. - if (!muted.contains(tp)) { + if (metadata.isStale(now) || cluster.leaderFor(tp) == null) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 86db44484f396..fd7907bd88920 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -208,8 +208,8 @@ void run(long now) { } } - if (metadata.isStale(now) || (metadata.timeToNextUpdate(now) != 0 && !result.unknownLeaderTopics.isEmpty())) { - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now); + if (metadata.isStale(now) || !result.unknownLeaderTopics.isEmpty())) { + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, metadata, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); From 21997499c6787136ab3165295a253bd2d9fcccc5 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Mon, 29 Aug 2016 13:25:38 -0700 Subject: [PATCH 04/19] fixing accumulator test --- .../org/apache/kafka/clients/Metadata.java | 6 ++-- .../clients/producer/internals/Sender.java | 2 +- .../internals/RecordAccumulatorTest.java | 32 +++++++++++-------- 3 files changed, 23 insertions(+), 17 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 060332f2a0af9..8f877d74e513d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -73,7 +73,7 @@ public Metadata() { } public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + this(refreshBackoffMs, metadataExpireMs, false, Cluster.empty(), new ClusterResourceListeners()); } /** @@ -84,7 +84,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, Cluster cluster, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.metadataStaleMs = metadataExpireMs * 2; @@ -92,7 +92,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpir this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; this.version = 0; - this.cluster = Cluster.empty(); + this.cluster = cluster; this.needUpdate = false; this.topics = new HashMap<>(); this.listeners = new ArrayList<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index fd7907bd88920..c11f99ba59373 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -208,7 +208,7 @@ void run(long now) { } } - if (metadata.isStale(now) || !result.unknownLeaderTopics.isEmpty())) { + if (metadata.isStale(now) || !result.unknownLeaderTopics.isEmpty()) { List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, metadata, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 216f07eb8f644..32872db4252e9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; @@ -73,7 +74,7 @@ public class RecordAccumulatorTest { public void teardown() { this.metrics.close(); } - +/* @Test public void testFull() throws Exception { long now = time.milliseconds(); @@ -340,15 +341,19 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertFalse(accum.hasUnsent()); } - +*/ @Test public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; long lingerMs = 3000L; int requestTimeout = 60; - - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); - int appends = 1024 / msgSize; + int batchSize = 1024; + long totalSize = 10 * 1024; + long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min + + Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, true, cluster); + RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); + int appends = batchSize / msgSize; // Test batches not in retry for (int i = 0; i < appends; i++) { @@ -362,11 +367,11 @@ public void testExpiredBatches() throws InterruptedException { // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + List expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should be expired", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -376,11 +381,11 @@ public void testExpiredBatches() throws InterruptedException { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -397,19 +402,19 @@ public void testExpiredBatches() throws InterruptedException { // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, time.milliseconds()); + expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); } - +/* @Test public void testMutedPartitions() throws InterruptedException { long now = time.milliseconds(); @@ -441,4 +446,5 @@ public void testMutedPartitions() throws InterruptedException { drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0); } +*/ } From 68643ffc2c831e9a4e4835a291979807357347a3 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Tue, 30 Aug 2016 14:55:53 -0700 Subject: [PATCH 05/19] fixed unit test --- .../org/apache/kafka/clients/Metadata.java | 2 +- .../producer/internals/RecordAccumulator.java | 42 ++++++ .../internals/RecordAccumulatorTest.java | 137 ++++++++++++++++-- 3 files changed, 165 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 8f877d74e513d..000ed0e25767c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -92,7 +92,7 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpir this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; this.version = 0; - this.cluster = cluster; + this.cluster = Cluster.empty(); this.needUpdate = false; this.topics = new HashMap<>(); this.listeners = new ArrayList<>(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 3228c310089f4..f35527931c1bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -263,6 +263,48 @@ public List abortExpiredBatches(int requestTimeout, Metadata metada return expiredBatches; } + /** + * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout + * due to metadata being unavailable + */ + public List abortExpiredBatchesOld(int requestTimeout, long now) { + List expiredBatches = new ArrayList<>(); + int count = 0; + for (Map.Entry> entry : this.batches.entrySet()) { + Deque dq = entry.getValue(); + TopicPartition tp = entry.getKey(); + // We only check if the batch should be expired if the partition does not have a batch in flight. + // This is to prevent later batches from being expired while an earlier batch is still in progress. + // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection + // is only active in this case. Otherwise the expiration order is not guaranteed. + if (!muted.contains(tp)) { + synchronized (dq) { + // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut + RecordBatch lastBatch = dq.peekLast(); + Iterator batchIterator = dq.iterator(); + while (batchIterator.hasNext()) { + RecordBatch batch = batchIterator.next(); + boolean isFull = batch != lastBatch || batch.records.isFull(); + // check if the batch is expired + if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { + expiredBatches.add(batch); + count++; + batchIterator.remove(); + deallocate(batch); + } else { + // Stop at the first batch that has not expired. + break; + } + } + } + } + } + if (!expiredBatches.isEmpty()) + log.trace("Expired {} batches in accumulator", count); + + return expiredBatches; + } + /** * Re-enqueue the given record batch in the accumulator to retry */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 32872db4252e9..2d2cfca26c137 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.Iterator; @@ -74,7 +75,7 @@ public class RecordAccumulatorTest { public void teardown() { this.metrics.close(); } -/* + @Test public void testFull() throws Exception { long now = time.milliseconds(); @@ -341,20 +342,126 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { assertFalse(accum.hasUnsent()); } -*/ + @Test public void testExpiredBatches() throws InterruptedException { long retryBackoffMs = 100L; - long lingerMs = 3000L; - int requestTimeout = 60; + long lingerMs = 500L; + int requestTimeoutMs = 2000; int batchSize = 1024; long totalSize = 10 * 1024; - long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min + long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min + long staleMetadataAgeMs = 2 * metadataMaxAgeMs; + List expiredBatches; + RecordAccumulator.ReadyCheckResult result; + Set readyNodes; + Map> drained; + + assertEquals("Stale metadata age must be more than request timeout", true, staleMetadataAgeMs > metadataMaxAgeMs); - Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, true, cluster); + Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs); + metadata.update(cluster, time.milliseconds()); RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = batchSize / msgSize; + // Test batches not in retry + for (int i = 0; i < appends; i++) { + accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); + assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); + } + // subtest1: Make the batches ready due to batch full + accum.append(tp1, 0L, key, value, null, 0); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); + + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertTrue("The batch should not expire because leaders are known and metadata is fresh.", + result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); + // subtest1 done + + // subtest2: test effects of mute-unmute on ready and expired batches + accum.mutePartition(tp1); // a batch is now in-flight. + + result = accum.ready(cluster, time.milliseconds()); + assertTrue("No partition should be ready because it's muted", result.readyNodes.isEmpty()); + + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", + result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); + + accum.unmutePartition(tp1); + // subtest2 done + + // subtest2: test effect of linger on ready and expired batches + time.sleep(lingerMs + 1); // Advance the clock beyond linger.ms. + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); + // subtest2 done + + // subtest3: test the effect of request timeout. + time.sleep(requestTimeoutMs); + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", + result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); + // subtest3 done + + // drain first batch. + drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("There should be one batch.", drained.get(node1.id()).size(), 1); + + // drain the second batch. + drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("Drain again. There should be one batch.", drained.get(node1.id()).size(), 1); + + // subtest 4: Test batches in retry. Create a retried batch + accum.append(tp1, 0L, key, value, null, 0); + time.sleep(lingerMs + 1); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); + time.sleep(requestTimeoutMs); + accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); + // subtest4 done. + + // subtest5: Test meatadata expiry + time.sleep(staleMetadataAgeMs); + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertEquals("The batch should expire because metadata is no longer fresh.", 2, expiredBatches.size()); + // subtest5 done + + // subtest 6: Test batches with missing leader. + int partition4 = 3; + TopicPartition tp4 = new TopicPartition(topic, partition4); + PartitionInfo part4 = new PartitionInfo(topic, partition4, null, null, null); + Map extra = Collections.singletonMap(tp4, part4); + cluster = cluster.withPartitions(extra); + + // update metadata + metadata.update(cluster, time.milliseconds()); + + accum.append(tp4, 0L, key, value, null, 0); + time.sleep(lingerMs + 1); + + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Node should not be ready", 0, result.readyNodes.size()); + + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size()); + + time.sleep(requestTimeoutMs+ 1); + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size()); + // subtest6 done + } + + @Test + public void testExpiredBatchesOld() throws InterruptedException { + long retryBackoffMs = 100L; + long lingerMs = 3000L; + int requestTimeout = 60; + + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); + int appends = 1024 / msgSize; + // Test batches not in retry for (int i = 0; i < appends; i++) { accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); @@ -367,11 +474,11 @@ public void testExpiredBatches() throws InterruptedException { // Advance the clock to expire the batch. time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - List expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + List expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -381,11 +488,11 @@ public void testExpiredBatches() throws InterruptedException { time.sleep(requestTimeout + 1); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); @@ -402,19 +509,19 @@ public void testExpiredBatches() throws InterruptedException { // test expiration. time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired.", 0, expiredBatches.size()); time.sleep(1L); accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatches(requestTimeout, metadata, time.milliseconds()); + expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); } -/* + @Test public void testMutedPartitions() throws InterruptedException { long now = time.milliseconds(); @@ -446,5 +553,5 @@ public void testMutedPartitions() throws InterruptedException { drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0); } -*/ + } From 30048ea8833792a3ce9fd737300c6936aac482f5 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Tue, 30 Aug 2016 17:59:36 -0700 Subject: [PATCH 06/19] checkstyle fixes --- .../clients/producer/internals/RecordAccumulatorTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 2d2cfca26c137..730ee7cbc98da 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -21,7 +21,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.Deque; import java.util.Iterator; @@ -447,7 +446,7 @@ public void testExpiredBatches() throws InterruptedException { expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size()); - time.sleep(requestTimeoutMs+ 1); + time.sleep(requestTimeoutMs + 1); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size()); // subtest6 done From e64af48f00b5cb3bc292b03d80ea856d51a00808 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 14:30:44 -0700 Subject: [PATCH 07/19] fixing batch expiry --- .../org/apache/kafka/clients/Metadata.java | 9 ++- .../kafka/clients/producer/KafkaProducer.java | 5 ++ .../producer/internals/RecordAccumulator.java | 44 +----------- .../internals/RecordAccumulatorTest.java | 71 +------------------ 4 files changed, 14 insertions(+), 115 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 000ed0e25767c..660f6d898ca23 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -49,7 +49,11 @@ public final class Metadata { public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; + private static final int DEFAULT_REQUEST_TIMEOUT_MS = 30000; + private static final long DEFAULT_RETRY_BACKOFF_MS = 100L; + private static final long DEFAULT_METADATA_MAX_AGE_MS = 60 * 60 * 1000L; + private final int requestTimeoutMs; private final long refreshBackoffMs; private final long metadataExpireMs; private final long metadataStaleMs; @@ -69,7 +73,7 @@ public final class Metadata { * Create a metadata instance with reasonable defaults */ public Metadata() { - this(100L, 60 * 60 * 1000L); + this(DEFAULT_RETRY_BACKOFF_MS, DEFAULT_METADATA_MAX_AGE_MS); } public Metadata(long refreshBackoffMs, long metadataExpireMs) { @@ -87,7 +91,8 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, Cluster cluster, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.metadataStaleMs = metadataExpireMs * 2; + this.requestTimeoutMs = requestTimeoutMs; + this.metadataStaleMs = metadataExpireMs + 3 * (requestTimeoutMs + refreshBackoffMs); this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3efc7b5cb69ce..7b0552f77faf0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -290,6 +290,11 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } + this.metadata = new Metadata(retryBackoffMs, + config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + true, + this.requestTimeoutMs); + this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index f35527931c1bf..d2dbff1db08d6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -235,49 +235,7 @@ public List abortExpiredBatches(int requestTimeout, Metadata metada // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. - if (metadata.isStale(now) || cluster.leaderFor(tp) == null) { - synchronized (dq) { - // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut - RecordBatch lastBatch = dq.peekLast(); - Iterator batchIterator = dq.iterator(); - while (batchIterator.hasNext()) { - RecordBatch batch = batchIterator.next(); - boolean isFull = batch != lastBatch || batch.records.isFull(); - // check if the batch is expired - if (batch.maybeExpire(requestTimeout, retryBackoffMs, now, this.lingerMs, isFull)) { - expiredBatches.add(batch); - count++; - batchIterator.remove(); - deallocate(batch); - } else { - // Stop at the first batch that has not expired. - break; - } - } - } - } - } - if (!expiredBatches.isEmpty()) - log.trace("Expired {} batches in accumulator", count); - - return expiredBatches; - } - - /** - * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout - * due to metadata being unavailable - */ - public List abortExpiredBatchesOld(int requestTimeout, long now) { - List expiredBatches = new ArrayList<>(); - int count = 0; - for (Map.Entry> entry : this.batches.entrySet()) { - Deque dq = entry.getValue(); - TopicPartition tp = entry.getKey(); - // We only check if the batch should be expired if the partition does not have a batch in flight. - // This is to prevent later batches from being expired while an earlier batch is still in progress. - // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection - // is only active in this case. Otherwise the expiration order is not guaranteed. - if (!muted.contains(tp)) { + if (!muted.contains(tp) && (metadata.isStale(now) || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 730ee7cbc98da..15d872051b659 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -358,7 +358,7 @@ public void testExpiredBatches() throws InterruptedException { assertEquals("Stale metadata age must be more than request timeout", true, staleMetadataAgeMs > metadataMaxAgeMs); - Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs); + Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false, requestTimeoutMs); metadata.update(cluster, time.milliseconds()); RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = batchSize / msgSize; @@ -452,75 +452,6 @@ public void testExpiredBatches() throws InterruptedException { // subtest6 done } - @Test - public void testExpiredBatchesOld() throws InterruptedException { - long retryBackoffMs = 100L; - long lingerMs = 3000L; - int requestTimeout = 60; - - RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); - int appends = 1024 / msgSize; - - // Test batches not in retry - for (int i = 0; i < appends; i++) { - accum.append(tp1, 0L, key, value, null, maxBlockTimeMs); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); - } - // Make the batches ready due to batch full - accum.append(tp1, 0L, key, value, null, 0); - Set readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - // Advance the clock to expire the batch. - time.sleep(requestTimeout + 1); - accum.mutePartition(tp1); - List expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); - - accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired", 1, expiredBatches.size()); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); - - // Advance the clock to make the next batch ready due to linger.ms - time.sleep(lingerMs); - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - time.sleep(requestTimeout + 1); - - accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should not be expired when metadata is still available and partition is muted", 0, expiredBatches.size()); - - accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired when the partition is not muted", 1, expiredBatches.size()); - assertEquals("No partitions should be ready.", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); - - // Test batches in retry. - // Create a retried batch - accum.append(tp1, 0L, key, value, null, 0); - time.sleep(lingerMs); - readyNodes = accum.ready(cluster, time.milliseconds()).readyNodes; - assertEquals("Our partition's leader should be ready", Collections.singleton(node1), readyNodes); - Map> drained = accum.drain(cluster, readyNodes, Integer.MAX_VALUE, time.milliseconds()); - assertEquals("There should be only one batch.", drained.get(node1.id()).size(), 1); - time.sleep(1000L); - accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); - - // test expiration. - time.sleep(requestTimeout + retryBackoffMs); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should not be expired.", 0, expiredBatches.size()); - time.sleep(1L); - - accum.mutePartition(tp1); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should not be expired when the partition is muted", 0, expiredBatches.size()); - - accum.unmutePartition(tp1); - expiredBatches = accum.abortExpiredBatchesOld(requestTimeout, time.milliseconds()); - assertEquals("The batch should be expired when the partition is not muted.", 1, expiredBatches.size()); - } - @Test public void testMutedPartitions() throws InterruptedException { long now = time.milliseconds(); From 7831754e7c45cf38a7eae4c69538243d51a883fc Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 16:25:03 -0700 Subject: [PATCH 08/19] refactored batch expiry logic into Sender --- .../org/apache/kafka/clients/Metadata.java | 23 ++++++++-------- .../kafka/clients/producer/KafkaProducer.java | 3 +-- .../producer/internals/RecordAccumulator.java | 4 +-- .../clients/producer/internals/Sender.java | 10 ++++--- .../internals/RecordAccumulatorTest.java | 26 ++++++++++++------- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 660f6d898ca23..1af1fae813065 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -49,14 +49,11 @@ public final class Metadata { public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; - private static final int DEFAULT_REQUEST_TIMEOUT_MS = 30000; private static final long DEFAULT_RETRY_BACKOFF_MS = 100L; private static final long DEFAULT_METADATA_MAX_AGE_MS = 60 * 60 * 1000L; - private final int requestTimeoutMs; private final long refreshBackoffMs; private final long metadataExpireMs; - private final long metadataStaleMs; private int version; private long lastRefreshMs; private long lastSuccessfulRefreshMs; @@ -88,11 +85,13 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ +<<<<<<< HEAD public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, Cluster cluster, ClusterResourceListeners clusterResourceListeners) { +======= + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled) { +>>>>>>> refactored batch expiry logic into Sender this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; - this.requestTimeoutMs = requestTimeoutMs; - this.metadataStaleMs = metadataExpireMs + 3 * (requestTimeoutMs + refreshBackoffMs); this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; @@ -131,13 +130,6 @@ public synchronized long timeToNextUpdate(long nowMs) { return Math.max(timeToExpire, timeToAllowUpdate); } - /** - * The metadata is stale if it has taken more than staleMs after last successful refresh. - */ - public synchronized boolean isStale(long nowMs) { - return (nowMs - this.lastSuccessfulRefreshMs) > this.metadataStaleMs; - } - /** * Request an update of the current cluster metadata info, return the current version before the update */ @@ -277,6 +269,13 @@ public synchronized long lastSuccessfulUpdate() { return this.lastSuccessfulRefreshMs; } + /** + * The max allowable age of metadata. + */ + public synchronized long getMetadataMaxAge() { + return this.metadataExpireMs; + } + /** * The metadata refresh backoff in ms */ diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 7b0552f77faf0..cf815a1ca76c4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -292,8 +292,7 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), - true, - this.requestTimeoutMs); + true); this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index d2dbff1db08d6..fc9c76e2187c1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -224,7 +224,7 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, Metadata metadata, long now) { + public List abortExpiredBatches(int requestTimeout, boolean staleMetadataNow, Metadata metadata, long now) { List expiredBatches = new ArrayList<>(); Cluster cluster = metadata.fetch(); int count = 0; @@ -235,7 +235,7 @@ public List abortExpiredBatches(int requestTimeout, Metadata metada // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. - if (!muted.contains(tp) && (metadata.isStale(now) || cluster.leaderFor(tp) == null)) { + if (!muted.contains(tp) && (staleMetadataNow || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c11f99ba59373..1641736099451 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -96,6 +96,9 @@ public class Sender implements Runnable { /* the max time to wait for the server to respond to the request*/ private final int requestTimeout; + + /* the max time to wait before expiring batches. */ + private final long metadataStaleMs; public Sender(KafkaClient client, Metadata metadata, @@ -120,6 +123,7 @@ public Sender(KafkaClient client, this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; + this.metadataStaleMs = metadata.getMetadataMaxAge() + 3 * (requestTimeout + metadata.refreshBackoff()); } /** @@ -207,9 +211,9 @@ void run(long now) { this.accumulator.mutePartition(batch.topicPartition); } } - - if (metadata.isStale(now) || !result.unknownLeaderTopics.isEmpty()) { - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, metadata, now); + boolean staleMetadataNow = (now - metadata.lastSuccessfulUpdate()) > this.metadataStaleMs; + if (staleMetadataNow || !result.unknownLeaderTopics.isEmpty()) { + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, staleMetadataNow, metadata, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 15d872051b659..bec74a428f355 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -350,15 +350,16 @@ public void testExpiredBatches() throws InterruptedException { int batchSize = 1024; long totalSize = 10 * 1024; long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min - long staleMetadataAgeMs = 2 * metadataMaxAgeMs; + long staleMetadataAgeMs = metadataMaxAgeMs + 3 * (requestTimeoutMs + retryBackoffMs); List expiredBatches; RecordAccumulator.ReadyCheckResult result; Set readyNodes; Map> drained; + boolean staleMetadataNow = false; assertEquals("Stale metadata age must be more than request timeout", true, staleMetadataAgeMs > metadataMaxAgeMs); - Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false, requestTimeoutMs); + Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false); metadata.update(cluster, time.milliseconds()); RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = batchSize / msgSize; @@ -373,7 +374,8 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertTrue("The batch should not expire because leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); // subtest1 done @@ -383,8 +385,9 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertTrue("No partition should be ready because it's muted", result.readyNodes.isEmpty()); - - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); @@ -399,7 +402,8 @@ public void testExpiredBatches() throws InterruptedException { // subtest3: test the effect of request timeout. time.sleep(requestTimeoutMs); - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); // subtest3 done @@ -423,7 +427,8 @@ public void testExpiredBatches() throws InterruptedException { // subtest5: Test meatadata expiry time.sleep(staleMetadataAgeMs); - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertEquals("The batch should expire because metadata is no longer fresh.", 2, expiredBatches.size()); // subtest5 done @@ -442,12 +447,13 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Node should not be ready", 0, result.readyNodes.size()); - - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size()); time.sleep(requestTimeoutMs + 1); - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, metadata, time.milliseconds()); + staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size()); // subtest6 done } From 304526ef22fa5a01f23340e928709963b0359a3d Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 16:32:58 -0700 Subject: [PATCH 09/19] restored KafkaProducer back to upstream/trunk --- .../java/org/apache/kafka/clients/producer/KafkaProducer.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index cf815a1ca76c4..3efc7b5cb69ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -290,10 +290,6 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG); } - this.metadata = new Metadata(retryBackoffMs, - config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), - true); - this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), this.totalMemorySize, this.compressionType, From 01582a2057775c63227f4111aa104d3c558cb8b5 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 17:28:20 -0700 Subject: [PATCH 10/19] refactored batch expiry logic --- .../producer/internals/RecordAccumulator.java | 5 ++-- .../clients/producer/internals/Sender.java | 6 ++--- .../internals/RecordAccumulatorTest.java | 26 +++++++++---------- 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index fc9c76e2187c1..eb703c7d6690d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -224,9 +224,8 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout * due to metadata being unavailable */ - public List abortExpiredBatches(int requestTimeout, boolean staleMetadataNow, Metadata metadata, long now) { + public List abortExpiredBatches(int requestTimeout, boolean isMetadataStale, Cluster cluster, long now) { List expiredBatches = new ArrayList<>(); - Cluster cluster = metadata.fetch(); int count = 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); @@ -235,7 +234,7 @@ public List abortExpiredBatches(int requestTimeout, boolean staleMe // This is to prevent later batches from being expired while an earlier batch is still in progress. // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. - if (!muted.contains(tp) && (staleMetadataNow || cluster.leaderFor(tp) == null)) { + if (!muted.contains(tp) && (isMetadataStale || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1641736099451..873fe58609076 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -211,9 +211,9 @@ void run(long now) { this.accumulator.mutePartition(batch.topicPartition); } } - boolean staleMetadataNow = (now - metadata.lastSuccessfulUpdate()) > this.metadataStaleMs; - if (staleMetadataNow || !result.unknownLeaderTopics.isEmpty()) { - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, staleMetadataNow, metadata, now); + boolean isMetadataStale = (now - metadata.lastSuccessfulUpdate()) > this.metadataStaleMs; + if (isMetadataStale || !result.unknownLeaderTopics.isEmpty()) { + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, isMetadataStale, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index bec74a428f355..001d938af6408 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -355,7 +355,7 @@ public void testExpiredBatches() throws InterruptedException { RecordAccumulator.ReadyCheckResult result; Set readyNodes; Map> drained; - boolean staleMetadataNow = false; + boolean isMetadataStale = false; assertEquals("Stale metadata age must be more than request timeout", true, staleMetadataAgeMs > metadataMaxAgeMs); @@ -374,8 +374,8 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); // subtest1 done @@ -386,8 +386,8 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertTrue("No partition should be ready because it's muted", result.readyNodes.isEmpty()); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); @@ -402,8 +402,8 @@ public void testExpiredBatches() throws InterruptedException { // subtest3: test the effect of request timeout. time.sleep(requestTimeoutMs); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); // subtest3 done @@ -427,8 +427,8 @@ public void testExpiredBatches() throws InterruptedException { // subtest5: Test meatadata expiry time.sleep(staleMetadataAgeMs); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should expire because metadata is no longer fresh.", 2, expiredBatches.size()); // subtest5 done @@ -447,13 +447,13 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Node should not be ready", 0, result.readyNodes.size()); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size()); time.sleep(requestTimeoutMs + 1); - staleMetadataNow = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; - expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, staleMetadataNow, metadata, time.milliseconds()); + isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size()); // subtest6 done } From 3d7baf5fe6124057460446a95ec2b7ee124d517f Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 17:41:02 -0700 Subject: [PATCH 11/19] added explanation for the staleness determination formula --- .../apache/kafka/clients/producer/internals/Sender.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 873fe58609076..c7c4e8cdd93e6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -123,6 +123,13 @@ public Sender(KafkaClient client, this.clientId = clientId; this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; + + /* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds + * the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to allow + * a few retries. A small number of retries (3) are chosen because metadata retries have no upper bound. + * However, as retries are subject to both regular request timeout and the backoff, staleness determination + * is delayed by that factor. + */ this.metadataStaleMs = metadata.getMetadataMaxAge() + 3 * (requestTimeout + metadata.refreshBackoff()); } From 1b6eba9aa181ef8ecc78de93879f014d724d991a Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 1 Sep 2016 17:42:56 -0700 Subject: [PATCH 12/19] unused import --- .../kafka/clients/producer/internals/RecordAccumulator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index eb703c7d6690d..84d4258a9abd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -15,7 +15,6 @@ import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; -import org.apache.kafka.clients.Metadata; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.Node; From 90faa04c690e765da914b0e284a534289c76bd63 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Tue, 13 Sep 2016 16:03:23 -0700 Subject: [PATCH 13/19] updated batch expiry logic documentation --- .../org/apache/kafka/clients/Metadata.java | 2 +- .../producer/internals/RecordAccumulator.java | 18 +++++++-- .../clients/producer/internals/Sender.java | 27 ++++++++----- .../internals/RecordAccumulatorTest.java | 40 ++++++++++--------- 4 files changed, 55 insertions(+), 32 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 1af1fae813065..d7f5a84444344 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -272,7 +272,7 @@ public synchronized long lastSuccessfulUpdate() { /** * The max allowable age of metadata. */ - public synchronized long getMetadataMaxAge() { + public long maxAge() { return this.metadataExpireMs; } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 84d4258a9abd2..8b3dfe74877a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -229,10 +229,22 @@ public List abortExpiredBatches(int requestTimeout, boolean isMetad for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); TopicPartition tp = entry.getKey(); - // We only check if the batch should be expired if the partition does not have a batch in flight. - // This is to prevent later batches from being expired while an earlier batch is still in progress. - // Note that `muted` is only ever populated if `max.in.flight.request.per.connection=1` so this protection + // We check if the batch should be expired if we know that we can't make progress on a given + // topic-partition. Specifically, we check if + // (1) the partition does not have a batch in flight. + // (2) Either the metadata is too stale or we don't have a leader for a partition. + // + // The first condition prevents later batches from being expired while an earlier batch is + // still in progress. We don't want batches to expire out-of-order. Note that `muted` is only ever + // populated if `max.in.flight.request.per.connection=1` so this protection // is only active in this case. Otherwise the expiration order is not guaranteed. + // + // The second condition allows expiration of lingering batches if we don't have a leader for + // the partition. + // + // Finally, we expire batches if the last metadata refresh was too long ago. We might run in to + // this situation when the producer is disconnected from all the brokers. Note that stale metadata + // is significantly longer than metadata.max.age. if (!muted.contains(tp) && (isMetadataStale || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c7c4e8cdd93e6..9353784f895e9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -124,13 +124,7 @@ public Sender(KafkaClient client, this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; - /* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds - * the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to allow - * a few retries. A small number of retries (3) are chosen because metadata retries have no upper bound. - * However, as retries are subject to both regular request timeout and the backoff, staleness determination - * is delayed by that factor. - */ - this.metadataStaleMs = metadata.getMetadataMaxAge() + 3 * (requestTimeout + metadata.refreshBackoff()); + this.metadataStaleMs = getMetadataStaleMs(metadata.maxAge(), requestTimeout, metadata.refreshBackoff()); } /** @@ -218,9 +212,9 @@ void run(long now) { this.accumulator.mutePartition(batch.topicPartition); } } - boolean isMetadataStale = (now - metadata.lastSuccessfulUpdate()) > this.metadataStaleMs; - if (isMetadataStale || !result.unknownLeaderTopics.isEmpty()) { - List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, isMetadataStale, cluster, now); + boolean stale = isMetadataStale(now, metadata, this.metadataStaleMs); + if (stale || !result.unknownLeaderTopics.isEmpty()) { + List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, stale, cluster, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount); @@ -388,6 +382,19 @@ public void wakeup() { this.client.wakeup(); } + /* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds + * the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to account + * for a few retries and transient network disconnections. A small number of retries (3) are chosen because + * metadata retries have no upper bound. However, as retries are subject to both regular request timeout and + * the backoff, staleness determination is delayed by that factor. + */ + static long getMetadataStaleMs(long metadataMaxAge, int requestTimeout, long refreshBackoff) { + return metadataMaxAge + 3 * (requestTimeout + refreshBackoff); + } + + static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) { + return (now - metadata.lastSuccessfulUpdate()) > metadataStaleMs; + } /** * A collection of sensors for the sender */ diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 001d938af6408..9580631d6d6e8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -350,14 +350,14 @@ public void testExpiredBatches() throws InterruptedException { int batchSize = 1024; long totalSize = 10 * 1024; long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min - long staleMetadataAgeMs = metadataMaxAgeMs + 3 * (requestTimeoutMs + retryBackoffMs); + long staleMetadataAgeMs = Sender.getMetadataStaleMs(metadataMaxAgeMs, requestTimeoutMs, retryBackoffMs); List expiredBatches; RecordAccumulator.ReadyCheckResult result; Set readyNodes; Map> drained; boolean isMetadataStale = false; - assertEquals("Stale metadata age must be more than request timeout", true, staleMetadataAgeMs > metadataMaxAgeMs); + assertTrue("Stale metadata age must be more than request timeout", staleMetadataAgeMs > metadataMaxAgeMs); Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false); metadata.update(cluster, time.milliseconds()); @@ -374,7 +374,7 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); @@ -386,7 +386,7 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertTrue("No partition should be ready because it's muted", result.readyNodes.isEmpty()); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); @@ -394,19 +394,19 @@ public void testExpiredBatches() throws InterruptedException { accum.unmutePartition(tp1); // subtest2 done - // subtest2: test effect of linger on ready and expired batches + // subtest3: test effect of linger on ready and expired batches time.sleep(lingerMs + 1); // Advance the clock beyond linger.ms. result = accum.ready(cluster, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); - // subtest2 done + // subtest3 done - // subtest3: test the effect of request timeout. + // subtest4: test the effect of request timeout. time.sleep(requestTimeoutMs); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); - // subtest3 done + // subtest4 done // drain first batch. drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); @@ -416,23 +416,27 @@ public void testExpiredBatches() throws InterruptedException { drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); assertEquals("Drain again. There should be one batch.", drained.get(node1.id()).size(), 1); - // subtest 4: Test batches in retry. Create a retried batch + // subtest5: Test batches in retry. Create a retried batch accum.append(tp1, 0L, key, value, null, 0); time.sleep(lingerMs + 1); result = accum.ready(cluster, time.milliseconds()); assertEquals("Our partition's leader should be ready", Collections.singleton(node1), result.readyNodes); time.sleep(requestTimeoutMs); accum.reenqueue(drained.get(node1.id()).get(0), time.milliseconds()); - // subtest4 done. + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); + expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); + assertTrue("The batch should not expire because the leaders are known and metadata is fresh.", + result.unknownLeaderTopics.isEmpty() && expiredBatches.size() == 0); + // subtest5 done. - // subtest5: Test meatadata expiry + // subtest6: Test meatadata expiry time.sleep(staleMetadataAgeMs); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should expire because metadata is no longer fresh.", 2, expiredBatches.size()); - // subtest5 done + // subtest6 done - // subtest 6: Test batches with missing leader. + // subtest7: Test batches with missing leader. int partition4 = 3; TopicPartition tp4 = new TopicPartition(topic, partition4); PartitionInfo part4 = new PartitionInfo(topic, partition4, null, null, null); @@ -447,15 +451,15 @@ public void testExpiredBatches() throws InterruptedException { result = accum.ready(cluster, time.milliseconds()); assertEquals("Node should not be ready", 0, result.readyNodes.size()); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should not expire because request timeout has not passed.", 0, expiredBatches.size()); time.sleep(requestTimeoutMs + 1); - isMetadataStale = (time.milliseconds() - metadata.lastSuccessfulUpdate()) > staleMetadataAgeMs; + isMetadataStale = Sender.isMetadataStale(time.milliseconds(), metadata, staleMetadataAgeMs); expiredBatches = accum.abortExpiredBatches(requestTimeoutMs, isMetadataStale, cluster, time.milliseconds()); assertEquals("The batch should expire because request timeout has passed.", 1, expiredBatches.size()); - // subtest6 done + // subtest7 done } @Test From 1c8572b93c67bf534a392abea2891c2a98210ff9 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Tue, 20 Sep 2016 18:12:37 -0700 Subject: [PATCH 14/19] clean up comments and documentation --- .../org/apache/kafka/clients/Metadata.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 1 + .../producer/internals/RecordAccumulator.java | 31 ++++++++++--------- 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index d7f5a84444344..560e95539bd41 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -50,7 +50,7 @@ public final class Metadata { public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; private static final long DEFAULT_RETRY_BACKOFF_MS = 100L; - private static final long DEFAULT_METADATA_MAX_AGE_MS = 60 * 60 * 1000L; + private static final long DEFAULT_METADATA_MAX_AGE_MS = 300 * 1000L; private final long refreshBackoffMs; private final long metadataExpireMs; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 3efc7b5cb69ce..13d5a70db70af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -334,6 +334,7 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 + t.printStackTrace(); close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 8b3dfe74877a6..b2ee58d61cedb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -220,8 +220,9 @@ private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, C } /** - * Abort the batches that have been sitting in RecordAccumulator for more than the configured requestTimeout - * due to metadata being unavailable + * Abort the batches that have been sitting in RecordAccumulator + * 1. longer than the requestTimeout when metadata is fresh; or + * 2. longer than {@link Sender#metadataStaleMs} when metadata is stale. */ public List abortExpiredBatches(int requestTimeout, boolean isMetadataStale, Cluster cluster, long now) { List expiredBatches = new ArrayList<>(); @@ -229,22 +230,22 @@ public List abortExpiredBatches(int requestTimeout, boolean isMetad for (Map.Entry> entry : this.batches.entrySet()) { Deque dq = entry.getValue(); TopicPartition tp = entry.getKey(); - // We check if the batch should be expired if we know that we can't make progress on a given - // topic-partition. Specifically, we check if - // (1) the partition does not have a batch in flight. - // (2) Either the metadata is too stale or we don't have a leader for a partition. + + // In the case where the user wishes to achieve strict ordering, (i.e., + // max.in.flight.request.per.connection=1) the muted membership condition helps ensure that batches also + // expire in order. Partitions are muted only if strict ordering is enabled and there are in-flight batches. + // This prevents later batches from being expired while an earlier batch is still in progress. // - // The first condition prevents later batches from being expired while an earlier batch is - // still in progress. We don't want batches to expire out-of-order. Note that `muted` is only ever - // populated if `max.in.flight.request.per.connection=1` so this protection - // is only active in this case. Otherwise the expiration order is not guaranteed. + // Second, we expire batches when we know that we can't make progress on a given topic-partition. + // Specifically, we check if + // (1) the partition does not have an in-flight batch. + // (2) Either the metadata is too stale or we don't have a leader for a partition. // - // The second condition allows expiration of lingering batches if we don't have a leader for - // the partition. + // The second condition allows expiration of ready batches if we don't have a leader for + // the partition. // - // Finally, we expire batches if the last metadata refresh was too long ago. We might run in to - // this situation when the producer is disconnected from all the brokers. Note that stale metadata - // is significantly longer than metadata.max.age. + // Finally, we expire batches if the last metadata refresh was too long ago. I.e., > {@link Sender#metadataStaleMs}. + // We might run in to this situation when the producer is disconnected from all the brokers. if (!muted.contains(tp) && (isMetadataStale || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut From b4c818ef7295f3c7203747fa25839b10a847e7b2 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Wed, 21 Sep 2016 14:06:46 -0700 Subject: [PATCH 15/19] rebased on upstream/trunk --- .../src/main/java/org/apache/kafka/clients/Metadata.java | 9 +++------ .../producer/internals/RecordAccumulatorTest.java | 3 ++- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 560e95539bd41..4ea8d4afcba88 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -74,7 +74,7 @@ public Metadata() { } public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this(refreshBackoffMs, metadataExpireMs, false, Cluster.empty(), new ClusterResourceListeners()); + this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); } /** @@ -85,11 +85,8 @@ public Metadata(long refreshBackoffMs, long metadataExpireMs) { * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ -<<<<<<< HEAD - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, Cluster cluster, ClusterResourceListeners clusterResourceListeners) { -======= - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled) { ->>>>>>> refactored batch expiry logic into Sender + + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; this.topicExpiryEnabled = topicExpiryEnabled; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9580631d6d6e8..9a909b7d3dd66 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.LogEntry; @@ -359,7 +360,7 @@ public void testExpiredBatches() throws InterruptedException { assertTrue("Stale metadata age must be more than request timeout", staleMetadataAgeMs > metadataMaxAgeMs); - Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false); + Metadata metadata = new Metadata(retryBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners()); metadata.update(cluster, time.milliseconds()); RecordAccumulator accum = new RecordAccumulator(batchSize, totalSize, CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time); int appends = batchSize / msgSize; From 538a90a4f90ec47d6615d528cecff5318e34f734 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 22 Sep 2016 14:24:27 -0700 Subject: [PATCH 16/19] updating batch expiry condition to include retries --- .../kafka/clients/producer/internals/RecordAccumulator.java | 3 ++- .../org/apache/kafka/clients/producer/internals/Sender.java | 6 +++--- .../clients/producer/internals/RecordAccumulatorTest.java | 3 ++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index b2ee58d61cedb..50d793b2878d8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -246,7 +246,8 @@ public List abortExpiredBatches(int requestTimeout, boolean isMetad // // Finally, we expire batches if the last metadata refresh was too long ago. I.e., > {@link Sender#metadataStaleMs}. // We might run in to this situation when the producer is disconnected from all the brokers. - if (!muted.contains(tp) && (isMetadataStale || cluster.leaderFor(tp) == null)) { + boolean guaranteeExpirationOrder = muted.contains(tp); + if (!guaranteeExpirationOrder && (isMetadataStale || cluster.leaderFor(tp) == null)) { synchronized (dq) { // iterate over the batches and expire them if they have been in the accumulator for more than requestTimeOut RecordBatch lastBatch = dq.peekLast(); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 9353784f895e9..3048a29edc6a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -124,7 +124,7 @@ public Sender(KafkaClient client, this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; - this.metadataStaleMs = getMetadataStaleMs(metadata.maxAge(), requestTimeout, metadata.refreshBackoff()); + this.metadataStaleMs = getMetadataStaleMs(metadata.maxAge(), requestTimeout, metadata.refreshBackoff(), retries); } /** @@ -388,8 +388,8 @@ public void wakeup() { * metadata retries have no upper bound. However, as retries are subject to both regular request timeout and * the backoff, staleness determination is delayed by that factor. */ - static long getMetadataStaleMs(long metadataMaxAge, int requestTimeout, long refreshBackoff) { - return metadataMaxAge + 3 * (requestTimeout + refreshBackoff); + static long getMetadataStaleMs(long metadataMaxAge, int requestTimeout, long refreshBackoff, int retries) { + return metadataMaxAge + Math.max(retries, 1) * (requestTimeout + refreshBackoff); } static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index 9a909b7d3dd66..21a2ec16c835f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -351,7 +351,8 @@ public void testExpiredBatches() throws InterruptedException { int batchSize = 1024; long totalSize = 10 * 1024; long metadataMaxAgeMs = 5 * 60 * 1000L; // 5 min - long staleMetadataAgeMs = Sender.getMetadataStaleMs(metadataMaxAgeMs, requestTimeoutMs, retryBackoffMs); + int retries = 1; + long staleMetadataAgeMs = Sender.getMetadataStaleMs(metadataMaxAgeMs, requestTimeoutMs, retryBackoffMs, retries); List expiredBatches; RecordAccumulator.ReadyCheckResult result; Set readyNodes; From 348a341e6432d6ea4b3eb5e2dc207e7e40623a9a Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 22 Sep 2016 15:07:16 -0700 Subject: [PATCH 17/19] replaced metadataExpireMs with metadataMaxAgeMs for consistency with common config names --- .../org/apache/kafka/clients/Metadata.java | 18 +++++++++--------- .../kafka/clients/producer/KafkaProducer.java | 1 - .../clients/producer/internals/Sender.java | 6 +++--- .../org/apache/kafka/clients/MetadataTest.java | 14 +++++++------- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 4ea8d4afcba88..781327d91e2ff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -53,7 +53,7 @@ public final class Metadata { private static final long DEFAULT_METADATA_MAX_AGE_MS = 300 * 1000L; private final long refreshBackoffMs; - private final long metadataExpireMs; + private final long metadataMaxAgeMs; private int version; private long lastRefreshMs; private long lastSuccessfulRefreshMs; @@ -73,22 +73,22 @@ public Metadata() { this(DEFAULT_RETRY_BACKOFF_MS, DEFAULT_METADATA_MAX_AGE_MS); } - public Metadata(long refreshBackoffMs, long metadataExpireMs) { - this(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + public Metadata(long refreshBackoffMs, long metadataMaxAgeMs) { + this(refreshBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners()); } /** * Create a new Metadata instance * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling - * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh + * @param metadataMaxAgeMs The maximum amount of time that metadata can be retained without refresh * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ - public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { + public Metadata(long refreshBackoffMs, long metadataMaxAgeMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { this.refreshBackoffMs = refreshBackoffMs; - this.metadataExpireMs = metadataExpireMs; + this.metadataMaxAgeMs = metadataMaxAgeMs; this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; @@ -122,7 +122,7 @@ public synchronized void add(String topic) { * is now */ public synchronized long timeToNextUpdate(long nowMs) { - long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0); + long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataMaxAgeMs - nowMs, 0); long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -269,8 +269,8 @@ public synchronized long lastSuccessfulUpdate() { /** * The max allowable age of metadata. */ - public long maxAge() { - return this.metadataExpireMs; + public long maxAgeMs() { + return this.metadataMaxAgeMs; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 13d5a70db70af..3efc7b5cb69ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -334,7 +334,6 @@ private KafkaProducer(ProducerConfig config, Serializer keySerializer, Serial } catch (Throwable t) { // call close methods if internal objects are already constructed // this is to prevent resource leak. see KAFKA-2121 - t.printStackTrace(); close(0, TimeUnit.MILLISECONDS, true); // now propagate the exception throw new KafkaException("Failed to construct kafka producer", t); diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 3048a29edc6a3..1cb3b04d9db14 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -124,7 +124,7 @@ public Sender(KafkaClient client, this.sensors = new SenderMetrics(metrics); this.requestTimeout = requestTimeout; - this.metadataStaleMs = getMetadataStaleMs(metadata.maxAge(), requestTimeout, metadata.refreshBackoff(), retries); + this.metadataStaleMs = getMetadataStaleMs(metadata.maxAgeMs(), requestTimeout, metadata.refreshBackoff(), retries); } /** @@ -388,8 +388,8 @@ public void wakeup() { * metadata retries have no upper bound. However, as retries are subject to both regular request timeout and * the backoff, staleness determination is delayed by that factor. */ - static long getMetadataStaleMs(long metadataMaxAge, int requestTimeout, long refreshBackoff, int retries) { - return metadataMaxAge + Math.max(retries, 1) * (requestTimeout + refreshBackoff); + static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) { + return metadataMaxAgeMs + Math.max(retries, 1) * (requestTimeoutMs + refreshBackoffMs); } static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) { diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 333a072065bb4..bfa934f5ad7fa 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -40,8 +40,8 @@ public class MetadataTest { private long refreshBackoffMs = 100; - private long metadataExpireMs = 1000; - private Metadata metadata = new Metadata(refreshBackoffMs, metadataExpireMs); + private long metadataMaxAgeMs = 1000; + private Metadata metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs); private AtomicReference backgroundError = new AtomicReference(); @After @@ -75,7 +75,7 @@ public void testMetadata() throws Exception { t1.join(); t2.join(); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); - time += metadataExpireMs; + time += metadataMaxAgeMs; assertTrue("Update needed due to stale metadata.", metadata.timeToNextUpdate(time) == 0); } @@ -153,7 +153,7 @@ public void testClusterListenerGetsNotifiedOfUpdate() { MockClusterResourceListener mockClusterListener = new MockClusterResourceListener(); ClusterResourceListeners listeners = new ClusterResourceListeners(); listeners.maybeAdd(mockClusterListener); - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, listeners); + metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, listeners); String hostName = "www.example.com"; Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002))); @@ -246,7 +246,7 @@ public void onMetadataUpdate(Cluster cluster) { @Test public void testTopicExpiry() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, true, new ClusterResourceListeners()); // Test that topic is expired if not used within the expiry interval long time = 0; @@ -278,7 +278,7 @@ public void testTopicExpiry() throws Exception { @Test public void testNonExpiringMetadata() throws Exception { - metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new ClusterResourceListeners()); + metadata = new Metadata(refreshBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners()); // Test that topic is not expired if not used within the expiry interval long time = 0; @@ -302,7 +302,7 @@ public void testNonExpiringMetadata() throws Exception { HashSet topics = new HashSet<>(); topics.add("topic4"); metadata.setTopics(topics); - time += metadataExpireMs * 2; + time += metadataMaxAgeMs * 2; metadata.update(Cluster.empty(), time); assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); } From a5bfc6a5e12333fe746a419716151d8ca23a42f1 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Fri, 23 Sep 2016 11:00:36 -0700 Subject: [PATCH 18/19] fixing comments and consolidating static constants --- .../org/apache/kafka/clients/Metadata.java | 21 +++++++++---------- .../clients/consumer/ConsumerConfig.java | 6 ++++-- .../clients/producer/internals/Sender.java | 6 +++--- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 781327d91e2ff..0e7bb7b598bff 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -12,6 +12,7 @@ */ package org.apache.kafka.clients; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.Node; @@ -49,10 +50,8 @@ public final class Metadata { public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; - private static final long DEFAULT_RETRY_BACKOFF_MS = 100L; - private static final long DEFAULT_METADATA_MAX_AGE_MS = 300 * 1000L; - private final long refreshBackoffMs; + private final long retryBackoffMs; private final long metadataMaxAgeMs; private int version; private long lastRefreshMs; @@ -70,24 +69,24 @@ public final class Metadata { * Create a metadata instance with reasonable defaults */ public Metadata() { - this(DEFAULT_RETRY_BACKOFF_MS, DEFAULT_METADATA_MAX_AGE_MS); + this(ConsumerConfig.DEFAULT_RETRY_BACKOFF_MS, ConsumerConfig.DEFAULT_METADATA_MAX_AGE_MS); } - public Metadata(long refreshBackoffMs, long metadataMaxAgeMs) { - this(refreshBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners()); + public Metadata(long retryBackoffMs, long metadataMaxAgeMs) { + this(retryBackoffMs, metadataMaxAgeMs, false, new ClusterResourceListeners()); } /** * Create a new Metadata instance - * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy + * @param retryBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling * @param metadataMaxAgeMs The maximum amount of time that metadata can be retained without refresh * @param topicExpiryEnabled If true, enable expiry of unused topics * @param clusterResourceListeners List of ClusterResourceListeners which will receive metadata updates. */ - public Metadata(long refreshBackoffMs, long metadataMaxAgeMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { - this.refreshBackoffMs = refreshBackoffMs; + public Metadata(long retryBackoffMs, long metadataMaxAgeMs, boolean topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) { + this.retryBackoffMs = retryBackoffMs; this.metadataMaxAgeMs = metadataMaxAgeMs; this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; @@ -123,7 +122,7 @@ public synchronized void add(String topic) { */ public synchronized long timeToNextUpdate(long nowMs) { long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataMaxAgeMs - nowMs, 0); - long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs; + long timeToAllowUpdate = this.lastRefreshMs + this.retryBackoffMs - nowMs; return Math.max(timeToExpire, timeToAllowUpdate); } @@ -277,7 +276,7 @@ public long maxAgeMs() { * The metadata refresh backoff in ms */ public long refreshBackoff() { - return refreshBackoffMs; + return retryBackoffMs; } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index ae791b0d46dbf..b72a4d454e414 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -131,6 +131,7 @@ public class ConsumerConfig extends AbstractConfig { /** metadata.max.age.ms */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; + public static final long DEFAULT_METADATA_MAX_AGE_MS = 5 * 60 * 1000L; /** * max.partition.fetch.bytes @@ -163,6 +164,7 @@ public class ConsumerConfig extends AbstractConfig { * retry.backoff.ms */ public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG; + public static final long DEFAULT_RETRY_BACKOFF_MS = 100L; /** * metrics.sample.window.ms @@ -236,7 +238,7 @@ public class ConsumerConfig extends AbstractConfig { PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, Type.LONG, - 5 * 60 * 1000, + DEFAULT_METADATA_MAX_AGE_MS, atLeast(0), Importance.LOW, CommonClientConfigs.METADATA_MAX_AGE_DOC) @@ -300,7 +302,7 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC) .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, - 100L, + DEFAULT_RETRY_BACKOFF_MS, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 1cb3b04d9db14..c1e0259b7ee7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -384,9 +384,9 @@ public void wakeup() { /* metadata becomes "stale" for batch expiry purpose when the time since the last successful update exceeds * the metadataStaleMs value. This value must be greater than the metadata.max.age and some delta to account - * for a few retries and transient network disconnections. A small number of retries (3) are chosen because - * metadata retries have no upper bound. However, as retries are subject to both regular request timeout and - * the backoff, staleness determination is delayed by that factor. + * for retries and transient network disconnections. There must be enough time for at least one retry. + * As retries are subject to both regular request timeout and the backoff, staleness determination is delayed + * by that factor. */ static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) { return metadataMaxAgeMs + Math.max(retries, 1) * (requestTimeoutMs + refreshBackoffMs); From 4aa79b750ca352f5f0df08629a6854db2649f245 Mon Sep 17 00:00:00 2001 From: Sumant Tambe Date: Thu, 10 Nov 2016 11:17:44 -0800 Subject: [PATCH 19/19] updated retries --- .../org/apache/kafka/clients/producer/internals/Sender.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index c1e0259b7ee7e..7a0b61fa7fedc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -389,7 +389,7 @@ public void wakeup() { * by that factor. */ static long getMetadataStaleMs(long metadataMaxAgeMs, int requestTimeoutMs, long refreshBackoffMs, int retries) { - return metadataMaxAgeMs + Math.max(retries, 1) * (requestTimeoutMs + refreshBackoffMs); + return metadataMaxAgeMs + (retries + 1) * (requestTimeoutMs + refreshBackoffMs); } static boolean isMetadataStale(long now, Metadata metadata, long metadataStaleMs) {