Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -34,14 +38,22 @@ public class ProduceRequestResult {
private final CountDownLatch latch = new CountDownLatch(1);
private final TopicPartition topicPartition;

/**
* List of dependent ProduceRequestResults created when this batch is split.
* When a batch is too large to send, it's split into multiple smaller batches.
* The original batch's ProduceRequestResult tracks all the split batches here
* so that flush() can wait for all splits to complete via awaitAllDependents().
*/
private final List<ProduceRequestResult> dependentResults = new ArrayList<>();
Comment thread
shashankhs11 marked this conversation as resolved.

private volatile Long baseOffset = null;
private volatile long logAppendTime = RecordBatch.NO_TIMESTAMP;
private volatile Function<Integer, RuntimeException> errorsByIndex;

/**
* Create an instance of this class.
*
* @param topicPartition The topic and partition to which this record set was sent was sent
* @param topicPartition The topic and partition to which this record set was sent
*/
public ProduceRequestResult(TopicPartition topicPartition) {
this.topicPartition = topicPartition;
Expand Down Expand Up @@ -70,7 +82,29 @@ public void done() {
}

/**
* Await the completion of this request
* Add a dependent ProduceRequestResult.
* This is used when a batch is split into multiple batches - in some cases like flush(), the original
* batch's result should not complete until all split batches have completed.
*
* @param dependentResult The dependent result to wait for
*/
public void addDependent(ProduceRequestResult dependentResult) {
synchronized (dependentResults) {
dependentResults.add(dependentResult);
}
}

/**
* Await the completion of this request.
*
* This only waits for THIS request's latch and not dependent results.
* When a batch is split into multiple batches, dependent results are created and tracked
* separately, but this method does not wait for them. Individual record futures automatically
* handle waiting for their respective split batch via {@link FutureRecordMetadata#chain(FutureRecordMetadata)},
* which redirects the future to point to the correct split batch's result.
*
* For flush() semantics that require waiting for all dependent results, use
* {@link #awaitAllDependents()}.
*/
public void await() throws InterruptedException {
latch.await();
Expand All @@ -86,6 +120,34 @@ public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return latch.await(timeout, unit);
}

/**
* Await the completion of this request and all the dependent requests.
*
* This method is used by flush() to ensure all split batches have completed before
* returning. This method waits for all dependent {@link ProduceRequestResult}s that
* were created when the batch was split.
*
* @throws InterruptedException if the thread is interrupted while waiting
*/
public void awaitAllDependents() throws InterruptedException {
Queue<ProduceRequestResult> toWait = new ArrayDeque<>();
toWait.add(this);

while (!toWait.isEmpty()) {
ProduceRequestResult current = toWait.poll();

// first wait for THIS result's latch to be released
current.latch.await();

// add all dependent split batches to the queue.
// we synchronize to get a consistent snapshot, then release the lock
// before continuing but the actual waiting happens outside the lock.
synchronized (current.dependentResults) {
toWait.addAll(current.dependentResults);
}
}
}

/**
* The base offset for the request (the first offset in the record set)
*/
Expand Down Expand Up @@ -127,6 +189,15 @@ public TopicPartition topicPartition() {

/**
* Has the request completed?
*
* This method only checks if THIS request has completed and not its dependent results.
* When a batch is split into multiple batches, the dependent split batches are tracked
* separately. Individual record futures handle waiting for their respective split
* batch via {@link FutureRecordMetadata#chain(FutureRecordMetadata)}, which updates the
* {@code nextRecordMetadata} pointer to follow the correct split batch.
*
* For flush() semantics that require waiting for all dependent results, use
* {@link #awaitAllDependents()}.
*/
public boolean completed() {
return this.latch.getCount() == 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,16 @@ private void completeFutureAndFireCallbacks(
}

public Deque<ProducerBatch> split(int splitBatchSize) {
Deque<ProducerBatch> batches = new ArrayDeque<>();
MemoryRecords memoryRecords = recordsBuilder.build();
RecordBatch recordBatch = validateAndGetRecordBatch();
Deque<ProducerBatch> batches = splitRecordsIntoBatches(recordBatch, splitBatchSize);
finalizeSplitBatches(batches);
return batches;
}

private RecordBatch validateAndGetRecordBatch() {
MemoryRecords memoryRecords = recordsBuilder.build();
Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator();

if (!recordBatchIter.hasNext())
throw new IllegalStateException("Cannot split an empty producer batch.");

Expand All @@ -336,6 +342,11 @@ public Deque<ProducerBatch> split(int splitBatchSize) {
if (recordBatchIter.hasNext())
throw new IllegalArgumentException("A producer batch should only have one record batch.");

return recordBatch;
}

private Deque<ProducerBatch> splitRecordsIntoBatches(RecordBatch recordBatch, int splitBatchSize) {
Deque<ProducerBatch> batches = new ArrayDeque<>();
Iterator<Thunk> thunkIter = thunks.iterator();
// We always allocate batch size because we are already splitting a big batch.
// And we also Retain the create time of the original batch.
Expand All @@ -362,9 +373,23 @@ public Deque<ProducerBatch> split(int splitBatchSize) {
batch.closeForRecordAppends();
}

return batches;
}

private void finalizeSplitBatches(Deque<ProducerBatch> batches) {
// Chain all split batch ProduceRequestResults to the original batch's produceFuture
// Ensures the original batch's future doesn't complete until all split batches complete
for (ProducerBatch splitBatch : batches) {
produceFuture.addDependent(splitBatch.produceFuture);
}

produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, index -> new RecordBatchTooLargeException());
produceFuture.done();

assignProducerStateToBatches(batches);
}

private void assignProducerStateToBatches(Deque<ProducerBatch> batches) {
if (hasSequence()) {
int sequence = baseSequence();
ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(producerId(), producerEpoch());
Expand All @@ -373,7 +398,6 @@ public Deque<ProducerBatch> split(int splitBatchSize) {
sequence += newBatch.recordCount;
}
}
return batches;
}

private ProducerBatch createBatchOffAccumulatorForRecord(Record record, int batchSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1076,8 +1076,13 @@ public void awaitFlushCompletion() throws InterruptedException {
// We must be careful not to hold a reference to the ProduceBatch(s) so that garbage
// collection can occur on the contents.
// The sender will remove ProducerBatch(s) from the original incomplete collection.
//
// We use awaitAllDependents() here instead of await() to ensure that if any batch
// was split into multiple batches, we wait for all the split batches to complete.
// This is required to guarantee that all records sent before flush()
// must be fully complete, including records in split batches.
for (ProduceRequestResult result : this.incomplete.requestResults())
result.await();
result.awaitAllDependents();
} finally {
this.flushesInProgress.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.MetadataResponse.PartitionMetadata;
Expand Down Expand Up @@ -71,7 +72,9 @@
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -1066,6 +1069,41 @@ public void testSplitAndReenqueue() throws ExecutionException, InterruptedExcept
assertEquals(1, future2.get().offset());
}

// here I am testing the hasRoomFor() behaviour
Comment thread
shashankhs11 marked this conversation as resolved.
// It allows the first record no matter the size
// but does not allow the second record
Comment thread
shashankhs11 marked this conversation as resolved.
@Test
public void testHasRoomForAllowsOversizedFirstRecordButRejectsSubsequentRecords() {
long now = time.milliseconds();
int smallBatchSize = 1024;

// Create a large record that exceeds batch size limit
byte[] largeValue = new byte[4 * 1024]; // 4KB > 1KB

// Create a small buffer that cannot fit the large record
ByteBuffer buffer = ByteBuffer.allocate(smallBatchSize);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0L);

// testing existing code:
// hasRoomFor() should return true for first record regardless of size
boolean hasRoomForFirst = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertTrue(hasRoomForFirst, "hasRoomFor() should return true for first record regardless of size when numRecords == 0");

// append the first oversized record - should succeed
builder.append(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertEquals(1, builder.numRecords(), "Should have successfully appended the first oversized record");

// now append another large record when numRecords > 0
boolean hasRoomForSecond = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(largeValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSecond, "hasRoomFor() should return false for oversized record when numRecords > 0");

// Now append with a smaller record that would normally fit but
// this too should be rejected due to limited buffer space
byte[] smallValue = new byte[100]; // Small record
boolean hasRoomForSmall = builder.hasRoomFor(now, ByteBuffer.wrap(key), ByteBuffer.wrap(smallValue), Record.EMPTY_HEADERS);
assertFalse(hasRoomForSmall, "hasRoomFor() should return false for any record when buffer is full from oversized first record");
}

@Test
public void testSplitBatchOffAccumulator() throws InterruptedException {
long seed = System.currentTimeMillis();
Expand Down Expand Up @@ -1790,4 +1828,65 @@ public void testSplitAndReenqueuePreventInfiniteRecursion() throws InterruptedEx
// Verify all original records are accounted for (no data loss)
assertEquals(100, keyFoundMap.size(), "All original 100 records should be present after splitting");
}

@Test
public void testProduceRequestResultAwaitAllDependents() throws Exception {
ProduceRequestResult parent = new ProduceRequestResult(tp1);

// make two dependent ProduceRequestResults -- mimicking split batches
ProduceRequestResult dependent1 = new ProduceRequestResult(tp1);
ProduceRequestResult dependent2 = new ProduceRequestResult(tp1);

// add dependents
parent.addDependent(dependent1);
parent.addDependent(dependent2);

parent.set(0L, RecordBatch.NO_TIMESTAMP, null);
parent.done();

// parent.completed() should return true (only checks latch)
assertTrue(parent.completed(), "Parent should be completed after done()");

// awaitAllDependents() should block because dependents are not complete
final AtomicBoolean awaitCompleted = new AtomicBoolean(false);
final AtomicReference<Exception> awaitException = new AtomicReference<>();

// to prove awaitAllDependents() is blocking, we run it in a separate thread
Thread awaitThread = new Thread(() -> {
try {
parent.awaitAllDependents();
awaitCompleted.set(true);
} catch (Exception e) {
awaitException.set(e);
}
});
awaitThread.start();
Thread.sleep(5);

// verify awaitAllDependents() is blocking
assertFalse(awaitCompleted.get(),
"awaitAllDependents() should block because dependents are not complete");

// now complete the first dependent
dependent1.set(0L, RecordBatch.NO_TIMESTAMP, null);
dependent1.done();

Thread.sleep(5);

// this should still be blocking because dependent2 is not complete
assertFalse(awaitCompleted.get(),
"awaitAllDependents() should still block because dependent2 is not complete");

// now complete the second dependent
dependent2.set(0L, RecordBatch.NO_TIMESTAMP, null);
dependent2.done();

// now awaitAllDependents() should complete
awaitThread.join(5000);

assertNull(awaitException.get(), "awaitAllDependents() should not throw exception");
assertTrue(awaitCompleted.get(),
"awaitAllDependents() should complete after all dependents are done");
assertFalse(awaitThread.isAlive(), "await thread should have completed");
}
}
Loading