From 6635efbde5815e269fa0d20dffc334fc2567b2a0 Mon Sep 17 00:00:00 2001 From: Artem Livshits Date: Mon, 6 Nov 2023 09:40:55 -0800 Subject: [PATCH] PROPOSAL: support asyc event execution in group coordinator This change fixes a broken abstraction where event execution relies on specific implementation detail of the ReplicaManager.appendRecords that with some arguments it is completed synchronously even though the interface is cleary asynchronous. This assumption can be broken by changing implementation, as shown by KIP-890 work that added transaction verification stage that may result in asynchronous completion (which should be perfectly fine because the function interface is asynchronous and must be used as such) and violate the assumption of event execution. Now the event execution supports asynchronous completion and can properly handle asynchnornous completion of the underlying funcionality. --- .../group/CoordinatorPartitionWriter.scala | 45 ++++++++--- .../group/runtime/CoordinatorEvent.java | 11 +++ .../group/runtime/CoordinatorRuntime.java | 76 ++++++++++++++++--- .../runtime/MultiThreadedEventProcessor.java | 28 +++++-- .../group/runtime/PartitionWriter.java | 18 +++++ 5 files changed, 151 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 5ee576ff3c52e..5e2d7063f2501 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -30,6 +30,7 @@ import org.apache.kafka.storage.internals.log.AppendOrigin import java.nio.ByteBuffer import java.util +import java.util.concurrent.CompletableFuture import scala.collection.Map /** @@ -115,6 +116,23 @@ class CoordinatorPartitionWriter[T]( tp: TopicPartition, records: util.List[T] ): Long = { + // TODO: should avoid this as it holds the thread around async call, but it is functionally correct + appendAsync(tp, records).get(); + } + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The future log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def appendAsync(tp: TopicPartition, + records: util.List[T] + ) : CompletableFuture[Long] = { + if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") replicaManager.getLogConfig(tp) match { @@ -145,28 +163,31 @@ class CoordinatorPartitionWriter[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } - var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + val future = new CompletableFuture[Long]() + def responseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { + val result = responseStatus.get(tp) + if (result.isEmpty) { + future.completeExceptionally(new IllegalStateException(s"Append status $responseStatus should have partition $tp.")) + } else if (result.get.error != Errors.NONE) { + future.completeExceptionally(result.get.error.exception()) + } else { + // The required offset. + future.complete(result.get.lastOffset + 1) + } + } + replicaManager.appendRecords( timeout = 0L, requiredAcks = 1, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), - responseCallback = results => appendResults = results, + responseCallback = responseCallback, // We can directly complete the purgatories here because we don't hold // any conflicting locks. actionQueue = directActionQueue ) - - val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) - - if (partitionResult.error != Errors.NONE) { - throw partitionResult.error.exception() - } - - // Required offset. - partitionResult.lastOffset + 1 + future case None => throw Errors.NOT_LEADER_OR_FOLLOWER.exception() diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java index 19fc0d88cf868..3efc18baa2fe6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorEvent.java @@ -18,6 +18,8 @@ import org.apache.kafka.common.TopicPartition; +import java.util.concurrent.CompletableFuture; + /** * The base event type used by all events processed in the * coordinator runtime. @@ -29,6 +31,15 @@ public interface CoordinatorEvent extends EventAccumulator.Event */ void run(); + /** + * Executes the event asynchronously + * @return Completable future + */ + default CompletableFuture runAsync() { + run(); + return CompletableFuture.completedFuture(null); + } + /** * Completes the event with the provided exception. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index 2f75fa59b5522..5080a36425219 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -47,6 +47,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Function; /** * The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator @@ -659,9 +660,21 @@ public TopicPartition key() { */ @Override public void run() { + try { + runAsync().get(); + } catch (Throwable t) { + complete(t); + } + } + + /** + * Called by the CoordinatorEventProcessor when the event is executed. + */ + @Override + public CompletableFuture runAsync() { try { // Get the context of the coordinator or fail if the coordinator is not in active state. - withActiveContextOrThrow(tp, context -> { + return withActiveContextOrThrowAsync(tp, context -> { long prevLastWrittenOffset = context.lastWrittenOffset; // Execute the operation. @@ -677,6 +690,7 @@ public void run() { } else { complete(null); } + return CompletableFuture.completedFuture(null); } else { // If the records are not empty, first, they are applied to the state machine, // second, then are written to the partition/log, and finally, the response @@ -689,23 +703,31 @@ public void run() { // Write the records to the log and update the last written // offset. - long offset = partitionWriter.append(tp, result.records()); - context.updateLastWrittenOffset(offset); - - // Add the response to the deferred queue. - if (!future.isDone()) { - context.deferredEventQueue.add(offset, this); - } else { - complete(null); - } + return partitionWriter.appendAsync(tp, result.records()).thenAccept(offset -> { + context.updateLastWrittenOffset(offset); + + // Add the response to the deferred queue. + if (!future.isDone()) { + context.deferredEventQueue.add(offset, this); + } else { + complete(null); + } + }).whenComplete((none, t) -> { + if (t != null) { + context.revertLastWrittenOffset(prevLastWrittenOffset); + complete(t); + } + }); } catch (Throwable t) { context.revertLastWrittenOffset(prevLastWrittenOffset); complete(t); + return CompletableFuture.completedFuture(null); } } }); } catch (Throwable t) { complete(t); + return CompletableFuture.completedFuture(null); } } @@ -1195,6 +1217,40 @@ private void withActiveContextOrThrow( } } + /** + * Calls the provided function with the context iff the context is active; throws + * an exception otherwise. This method ensures that the context lock is acquired + * before calling the function and releases afterwards. + * + * @param tp The topic partition. + * @param asyncFunc The function that will receive the context. + * @throws NotCoordinatorException + * @throws CoordinatorLoadInProgressException + */ + private CompletableFuture withActiveContextOrThrowAsync( + TopicPartition tp, + Function> asyncFunc + ) throws NotCoordinatorException, CoordinatorLoadInProgressException { + CoordinatorContext context = contextOrThrow(tp); + + CompletableFuture result = null; + try { + context.lock.lock(); + if (context.state == CoordinatorState.ACTIVE) { + // TODO: it's not a good practice to hold lock around async calls, should use event accumulator to synchronize + result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock()); + return result; + } else if (context.state == CoordinatorState.LOADING) { + throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception(); + } else { + throw Errors.NOT_COORDINATOR.exception(); + } + } finally { + if (result == null) + context.lock.unlock(); + } + } + /** * Schedules a write operation. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java index e4adc18e95793..93b4176899dcd 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/MultiThreadedEventProcessor.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -131,17 +132,34 @@ private void handleEvents() { CoordinatorEvent event = accumulator.poll(); recordPollEndTime(time.milliseconds()); if (event != null) { + CompletableFuture future = null; try { log.debug("Executing event: {}.", event); long dequeuedTimeMs = time.milliseconds(); metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs()); - event.run(); - metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs); + future = event.runAsync(); + future.whenComplete((none, t) -> { + try { + metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs); + if (t != null) { + log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); + event.complete(t); + } + } finally { + accumulator.done(event); + } + }); } catch (Throwable t) { - log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); - event.complete(t); + if (future == null) { + // We failed before we managed to create the future, so do this inline. + log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t); + event.complete(t); + } } finally { - accumulator.done(event); + if (future == null) { + // We failed before we managed to create the future, so do this inline. + accumulator.done(event); + } } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index 1f8d4119f7ea5..21cebf199e43a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.TopicPartition; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; /** * A simple interface to write records to Partitions/Logs. It contains the minimum @@ -92,4 +94,20 @@ long append( TopicPartition tp, List records ) throws KafkaException; + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The future log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + default CompletableFuture appendAsync( + TopicPartition tp, + List records + ) throws KafkaException { + return CompletableFuture.completedFuture(append(tp, records)); + } }