Skip to content

Commit 71299c2

Browse files
KAFKA-19972: Bump delivery count on session release (#21092)
If a client application crashes then it can go in infinte loop where same records will be delivered. Though previously we chose to decrease the delivery count on session release as we didn't have throttling support. Now when we do then it makes sense to bump the delivery count on session close. Also as share-groups clients should ideally not have pre-fetched data hence it's safe to bump the delivery count on session release. I have not removed the code to decrease the delivery count as that functionality is well tested and we might need at the time of pre-fetching support or in cases where we do need not to bump the delivery count, in future. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent f662350 commit 71299c2

File tree

3 files changed

+49
-74
lines changed

3 files changed

+49
-74
lines changed

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2218,29 +2218,6 @@ public void testShareConsumerAfterCoordinatorMovement() throws Exception {
22182218
verifyShareGroupStateTopicRecordsProduced();
22192219
}
22202220

2221-
@ClusterTest
2222-
public void testDeliveryCountNotIncreaseAfterSessionClose() {
2223-
alterShareAutoOffsetReset("group1", "earliest");
2224-
try (Producer<byte[], byte[]> producer = createProducer()) {
2225-
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
2226-
// We write 10 records to the topic, so they would be written from offsets 0-9 on the topic.
2227-
for (int i = 0; i < 10; i++) {
2228-
assertDoesNotThrow(() -> producer.send(record).get(), "Failed to send records");
2229-
}
2230-
}
2231-
2232-
// Perform the fetch, close in a loop.
2233-
for (int count = 0; count < ShareGroupConfig.SHARE_GROUP_DELIVERY_COUNT_LIMIT_DEFAULT; count++) {
2234-
consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, false);
2235-
}
2236-
2237-
// If the delivery count is increased, consumer will get nothing.
2238-
int consumedMessageCount = consumeMessages(new AtomicInteger(0), 10, "group1", 1, 10, true);
2239-
// The records returned belong to offsets 0-9.
2240-
assertEquals(10, consumedMessageCount);
2241-
verifyShareGroupStateTopicRecordsProduced();
2242-
}
2243-
22442221
@ClusterTest
22452222
public void testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAcknowledgement() {
22462223
alterShareAutoOffsetReset("group1", "earliest");
@@ -2270,13 +2247,13 @@ public void testDeliveryCountDifferentBehaviorWhenClosingSessionWithExplicitAckn
22702247
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 2);
22712248
assertEquals(2, records.count());
22722249
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2273-
assertEquals((short) 1, records.records(tp).get(1).deliveryCount().get());
2250+
assertEquals((short) 2, records.records(tp).get(1).deliveryCount().get());
22742251
}
22752252
}
22762253

22772254
@ClusterTest(
22782255
serverProperties = {
2279-
@ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "2"),
2256+
@ClusterConfigProperty(key = "group.share.delivery.count.limit", value = "3"),
22802257
}
22812258
)
22822259
public void testBehaviorOnDeliveryCountBoundary() {
@@ -2304,15 +2281,14 @@ public void testBehaviorOnDeliveryCountBoundary() {
23042281
records = waitedPoll(shareConsumer, 2500L, 1);
23052282
assertEquals(1, records.count());
23062283
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2307-
23082284
}
23092285

23102286
// Start again and same record should be delivered
23112287
try (ShareConsumer<byte[], byte[]> shareConsumer = createShareConsumer("group1", Map.of())) {
23122288
shareConsumer.subscribe(Set.of(tp.topic()));
23132289
ConsumerRecords<byte[], byte[]> records = waitedPoll(shareConsumer, 2500L, 1);
23142290
assertEquals(1, records.count());
2315-
assertEquals((short) 2, records.records(tp).get(0).deliveryCount().get());
2291+
assertEquals((short) 3, records.records(tp).get(0).deliveryCount().get());
23162292
}
23172293
}
23182294

@@ -2369,9 +2345,9 @@ public void testComplexShareConsumer() throws Exception {
23692345
// Let the complex consumer read the messages.
23702346
service.schedule(() -> prodState.done().set(true), 5L, TimeUnit.SECONDS);
23712347

2372-
// All messages which can be read are read, some would be redelivered (roughly 3 times the records produced).
2348+
// All messages which can be read are read, some would be redelivered (roughly 2 times the records produced).
23732349
TestUtils.waitForCondition(complexCons1::isDone, 45_000L, () -> "did not close!");
2374-
int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 3 * 0.95); // 3 times with margin of error (5%).
2350+
int delta = complexCons1.recordsRead() - (int) (prodState.count().get() * 2 * 0.95); // 2 times with margin of error (5%).
23752351

23762352
assertTrue(delta > 0,
23772353
String.format("Producer (%d) and share consumer (%d) record count mismatch.", prodState.count().get(), complexCons1.recordsRead()));

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,7 +1116,7 @@ private Optional<Throwable> releaseAcquiredRecordsForPerOffsetBatch(String membe
11161116
// These records were fetched but they were not actually delivered to the client.
11171117
InFlightState updateResult = offsetState.getValue().startStateTransition(
11181118
offsetState.getKey() < startOffset ? RecordState.ARCHIVED : recordState,
1119-
DeliveryCountOps.DECREASE,
1119+
DeliveryCountOps.NO_OP,
11201120
this.maxDeliveryCount,
11211121
EMPTY_MEMBER_ID
11221122
);
@@ -1158,7 +1158,7 @@ private Optional<Throwable> releaseAcquiredRecordsForCompleteBatch(String member
11581158
if (inFlightBatch.batchState() == RecordState.ACQUIRED) {
11591159
InFlightState updateResult = inFlightBatch.startBatchStateTransition(
11601160
inFlightBatch.lastOffset() < startOffset ? RecordState.ARCHIVED : recordState,
1161-
DeliveryCountOps.DECREASE,
1161+
DeliveryCountOps.NO_OP,
11621162
this.maxDeliveryCount,
11631163
EMPTY_MEMBER_ID
11641164
);

0 commit comments

Comments
 (0)