diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 5ee576ff3c52e..5e2d7063f2501 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -30,6 +30,7 @@ import org.apache.kafka.storage.internals.log.AppendOrigin import java.nio.ByteBuffer import java.util +import java.util.concurrent.CompletableFuture import scala.collection.Map /** @@ -115,6 +116,23 @@ class CoordinatorPartitionWriter[T]( tp: TopicPartition, records: util.List[T] ): Long = { + // TODO: should avoid this as it holds the thread around async call, but it is functionally correct + appendAsync(tp, records).get(); + } + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The future log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def appendAsync(tp: TopicPartition, + records: util.List[T] + ) : CompletableFuture[Long] = { + if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") replicaManager.getLogConfig(tp) match { @@ -145,28 +163,31 @@ class CoordinatorPartitionWriter[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } - var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + val future = new CompletableFuture[Long]() + def responseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + val result = responseStatus.get(tp) + if (result.isEmpty) { + future.completeExceptionally(new IllegalStateException(s"Append status $responseStatus should have partition $tp.")) + } else if (result.get.error != Errors.NONE) { + future.completeExceptionally(result.get.error.exception()) + } else { + // The required offset. + future.complete(result.get.lastOffset + 1) + } + } + replicaManager.appendRecords( timeout = 0L, requiredAcks = 1, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), - responseCallback = results => appendResults = results, + responseCallback = responseCallback, // We can directly complete the purgatories here because we don't hold // any conflicting locks. actionQueue = directActionQueue ) - - val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) - - if (partitionResult.error != Errors.NONE) { - throw partitionResult.error.exception() - } - - // Required offset. - partitionResult.lastOffset + 1 + future case None => throw Errors.NOT_LEADER_OR_FOLLOWER.exception() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java index 19fc0d88cf868..3efc18baa2fe6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.TopicPartition; +import java.util.concurrent.CompletableFuture; + /** * The base event type used by all events processed in the * coordinator runtime. @@ -29,6 +31,15 @@ public interface CoordinatorEvent extends EventAccumulator.Event */ void run(); + /** + * Executes the event asynchronously + * @return Completable future + */ + default CompletableFuture runAsync() { + run(); + return CompletableFuture.completedFuture(null); + } + /** * Completes the event with the provided exception. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 2f75fa59b5522..5080a36425219 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; /** * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { + try { + runAsync().get(); + } catch (Throwable t) { + complete(t); + } + } + + /** + * Called by the CoordinatorEventProcessor when the event is executed. + */ + @Override + public CompletableFuture runAsync() { try { // Get the context of the coordinator or fail if the coordinator is not in active state. - withActiveContextOrThrow(tp, context -> { + return withActiveContextOrThrowAsync(tp, context -> { long prevLastWrittenOffset = context.lastWrittenOffset; // Execute the operation. @@ -677,6 +690,7 @@ public void run() { } else { complete(null); } + return CompletableFuture.completedFuture(null); } else { // If the records are not empty, first, they are applied to the state machine, // second, then are written to the partition/log, and finally, the response @@ -689,23 +703,31 @@ public void run() { // Write the records to the log and update the last written // offset. - long offset = partitionWriter.append(tp, result.records()); - context.updateLastWrittenOffset(offset); - - // Add the response to the deferred queue. - if (!future.isDone()) { - context.deferredEventQueue.add(offset, this); - } else { - complete(null); - } + return partitionWriter.appendAsync(tp, result.records()).thenAccept(offset -> { + context.updateLastWrittenOffset(offset); + + // Add the response to the deferred queue. + if (!future.isDone()) { + context.deferredEventQueue.add(offset, this); + } else { + complete(null); + } + }).whenComplete((none, t) -> { + if (t != null) { + context.revertLastWrittenOffset(prevLastWrittenOffset); + complete(t); + } + }); } catch (Throwable t) { context.revertLastWrittenOffset(prevLastWrittenOffset); complete(t); + return CompletableFuture.completedFuture(null); } } }); } catch (Throwable t) { complete(t); + return CompletableFuture.completedFuture(null); } } @@ -1195,6 +1217,40 @@ private void withActiveContextOrThrow( } } + /** + * Calls the provided function with the context iff the context is active; throws + * an exception otherwise. This method ensures that the context lock is acquired + * before calling the function and releases afterwards. + * + * @param tp The topic partition. + * @param asyncFunc The function that will receive the context. + * @throws NotCoordinatorException + * @throws CoordinatorLoadInProgressException + */ + private CompletableFuture withActiveContextOrThrowAsync( + TopicPartition tp, + Function> asyncFunc + ) throws NotCoordinatorException, CoordinatorLoadInProgressException { + CoordinatorContext context = contextOrThrow(tp); + + CompletableFuture result = null; + try { + context.lock.lock(); + if (context.state == CoordinatorState.ACTIVE) { + // TODO: it's not a good practice to hold lock around async calls, should use event accumulator to synchronize + result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock()); + return result; + } else if (context.state == CoordinatorState.LOADING) { + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception(); + } else { + throw Errors.NOT_COORDINATOR.exception(); + } + } finally { + if (result == null) + context.lock.unlock(); + } + } + /** * Schedules a write operation. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java index e4adc18e95793..93b4176899dcd 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -131,17 +132,34 @@ private void handleEvents() { CoordinatorEvent event = accumulator.poll(); recordPollEndTime(time.milliseconds()); if (event != null) { + CompletableFuture future = null; try { log.debug("Executing event: {}.", event); long dequeuedTimeMs = time.milliseconds(); metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs()); - event.run(); - metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs); + future = event.runAsync(); + future.whenComplete((none, t) -> { + try { + metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs); + if (t != null) { + log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); + event.complete(t); + } + } finally { + accumulator.done(event); + } + }); } catch (Throwable t) { - log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); - event.complete(t); + if (future == null) { + // We failed before we managed to create the future, so do this inline. + log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); + event.complete(t); + } } finally { - accumulator.done(event); + if (future == null) { + // We failed before we managed to create the future, so do this inline. + accumulator.done(event); + } } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index 1f8d4119f7ea5..21cebf199e43a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * A simple interface to write records to Partitions/Logs. It contains the minimum @@ -92,4 +94,20 @@ long append( TopicPartition tp, List records ) throws KafkaException; + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The future log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + default CompletableFuture appendAsync( + TopicPartition tp, + List records + ) throws KafkaException { + return CompletableFuture.completedFuture(append(tp, records)); + } }