Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.kafka.storage.internals.log.AppendOrigin

import java.nio.ByteBuffer
import java.util
import java.util.concurrent.CompletableFuture
import scala.collection.Map

/**
Expand Down Expand Up @@ -115,6 +116,23 @@ class CoordinatorPartitionWriter[T](
tp: TopicPartition,
records: util.List[T]
): Long = {
// TODO: should avoid this as it holds the thread around async call, but it is functionally correct
appendAsync(tp, records).get();
}

/**
* Write records to the partitions. Records are written in one batch so
* atomicity is guaranteed.
*
* @param tp The partition to write records to.
* @param records The list of records. The records are written in a single batch.
* @return The future log end offset right after the written records.
* @throws KafkaException Any KafkaException caught during the write operation.
*/
override def appendAsync(tp: TopicPartition,
records: util.List[T]
) : CompletableFuture[Long] = {

if (records.isEmpty) throw new IllegalStateException("records must be non-empty.")

replicaManager.getLogConfig(tp) match {
Expand Down Expand Up @@ -145,28 +163,31 @@ class CoordinatorPartitionWriter[T](
s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.")
}

var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
val future = new CompletableFuture[Long]()
def responseCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = {
val result = responseStatus.get(tp)
if (result.isEmpty) {
future.completeExceptionally(new IllegalStateException(s"Append status $responseStatus should have partition $tp."))
} else if (result.get.error != Errors.NONE) {
future.completeExceptionally(result.get.error.exception())
} else {
// The required offset.
future.complete(result.get.lastOffset + 1)
}
}

replicaManager.appendRecords(
timeout = 0L,
requiredAcks = 1,
internalTopicsAllowed = true,
origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> recordsBuilder.build()),
responseCallback = results => appendResults = results,
responseCallback = responseCallback,
// We can directly complete the purgatories here because we don't hold
// any conflicting locks.
actionQueue = directActionQueue
)

val partitionResult = appendResults.getOrElse(tp,
throw new IllegalStateException(s"Append status $appendResults should have partition $tp."))

if (partitionResult.error != Errors.NONE) {
throw partitionResult.error.exception()
}

// Required offset.
partitionResult.lastOffset + 1
future

case None =>
throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.TopicPartition;

import java.util.concurrent.CompletableFuture;

/**
* The base event type used by all events processed in the
* coordinator runtime.
Expand All @@ -29,6 +31,15 @@ public interface CoordinatorEvent extends EventAccumulator.Event<TopicPartition>
*/
void run();

/**
* Executes the event asynchronously
* @return Completable future
*/
default CompletableFuture<Void> runAsync() {
run();
return CompletableFuture.completedFuture(null);
}

/**
* Completes the event with the provided exception.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* The CoordinatorRuntime provides a framework to implement coordinators such as the group coordinator
Expand Down Expand Up @@ -659,9 +660,21 @@ public TopicPartition key() {
*/
@Override
public void run() {
try {
runAsync().get();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, while this guarantees ordering, it disables pipelining and thus potentially reduces the throughput, since we have to wait for each event's records to be fully replicated before processing the next event.

We probably could introduce a different callback in ReplicaManager.appendRecords that's invoked when the records are appended to the local log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually don't need to wait for replication, so the current pipelining works without changes -- the current logic uses acks=1 and captures the offset and then waits for HWM to be advanced to complete the write request. It may prevent potential pipelining opportunities if new async stages are added for acks=1 (e.g. transaction verification). But the most important thing is that with this proposal, innovating under appendRecords interface would just work out of box, which is the purpose of having interfaces -- innovating under the interface doesn't break callers that use interface correctly (which makes system modular).

If we find out that we want the pipelining for transaction verification we can make this optimization later (if we find it to be a problem). We will have a choice between complexity and potentially better pipelining; with the current model, we don't have the choice -- the workflow will break if we add an async state to acks=1 processing and will have to fix it before shipping.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation, Artem. Yes, it's true that the new group coordinator only depends on acks=1.

Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group. In the proposed approach, if one client's log append is blocked for additional async check, it blocks the processing of other clients and other groups. So, it still seems to reduce the overall throughput somewhat.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if one client's log append is blocked for additional async check

That is correct, it may become a perf problem, we can measure and see if it's worth fixing in practice, we'll have this choice (as well as the choice to postpone the fix, if we have time pressure to release). But it won't be a functional problem. Right now it is a functional problem, which is suboptimal in many ways:

  • appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes)
  • we cannot release new transaction protocol (or new coordinator) without implementing new logic, which makes hard dependencies and pushes against timelines (now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area)
  • KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848
  • 2 fairly complex components are now tied together -- we cannot just innovate on transaction protocol implementation details (or to be broader -- on the whole IO subsystem implementation details -- e.g. Async IO) without understanding group coordinator implementation detail and we cannot innovate on group coordinator implementation detail without understanding implementation details of transaction protocol
  • to make the previous point worse, the dependency is not visible at the "point of use" -- someone tasked with improving transaction protocol (or IO in general) would have no indication from the appendRecords interface, that adding an async stage would need to have a corresponding change in group coordinator
  • the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs

IMO, the fact that transaction verification implementation just doesn't work out-of-box with the new group coordinator (and in fact requires quite non-trivial follow-up work that will block the release) is an architectural issue. We should strive to make the system more decoupled, so that the context an engineer needs to understand to make local changes in a part of system is less.

Each new group coordinator thread handles requests from multiple groups and multiple clients within the same group.

I don't think it's bound to a thread, but indeed the concurrency is limited to partition -- we don't let operations on the same partition run concurrently, so all the groups that are mapped to the same partition are contending. This is, however, a specific implementation choice, it should be possible to make a group to be a unit of concurrency, and if that's not enough, we can let offset commits for different partitions go concurrently as well (they just need to make sure that group doesn't change, which is sort of a "read lock" on the group), at which point there probably wouldn't be any contention in the common path.

Now, one might ask a question, implementing per-group synchronization adds complexity and handling transaction verification as an explicit state transition in group coordinator adds complexity, what the difference? I'd say the difference is fundamental -- per-group synchronization complexity is encapsulated in one component and keeps the system decoupled: an engineer tasked to improve transaction protocol, doesn't need understand implementation details of group coordinator and vice versa. Changes are smaller, can be made faster, and less bug prone. Win-win-win.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. Here is my take:

That is correct, it may become a perf problem

I strongly disagree on blocking the event loop. It will not become a perf problem. It is one. It is also an anti-pattern.

Right now it is a functional problem

It is technically not a functional problem, at least not yet, because I haven't not implemented the transactional offset commit in the new coordinator. ;)

appendRecords has async interface, thus adding async stages under such an interface can be done without inspection and understanding all callers (that's what an interface is -- any compliant implementation is valid), but doing so will break the current logic (so from the proper interface usage perspective it is a bug in the caller, which this proposal fixes)

I will change this to not use appendRecords, this will make the contract clear.

now all of a sudden KIP-848 got a new work to do before release, just because there is some independent work is going on in transaction area

This is incorrect. We knew about this and we always had an implementation in mind which works. I will basically decouple the write in two stages: 1) validate/prepare the transaction; and 2) update state and write. As we discussed in the other PR, this is also required for the old coordinator to work correctly.

KIP-890 part2 design is still under discussion, the verification protocol is likely to change, so any changes in KIP-890 protocol are going to have ripple effects on KIP-848

I don't agree with this. As we just saw, we already failed to make it work correctly for the existing coordinator so the dependency was already there. Again, we can do better, I agree.

the work needs to be duplicated in group coordinator (and the protocol is going slightly different for different client versions) which becomes a likely source of bugs

This is completely unrelated in my opinion as this is true for both the old and the new coordinator.

Overall, I agree that we could do better but I think that it is not the right time to change this. We are already under high time pressure and actually changing this in the right way puts even more pressure. We should look for a proper solution afterwards.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is completely unrelated in my opinion as this is true for both the old and the new coordinator.

It's true that it's a problem with the old coordinator, and we should make the whatever minimal fixes required for the old coordinator to work (and if it happens to work end-to-end, which I think it might, we won't need to fix it), but that code is going away and shouldn't define the forward-looking architecture.

As we build the new coordinator, we should build it in a way that improves forward-looking architecture. Keeping the right abstraction is good, coincidentally it helps with the timelines -- we can use this proposal and use the work that already has been done instead of doing new work of bringing implementation details into group coordinator.

Moreover, I wonder if we need yet another thread pool to handle group coordinator logic, I think it would be good to just re-use the request handler threads to run this functionality. This would avoid thread pools proliferation and also reuse various useful improvements that work only on request pool threads, e.g. RequestLocal (hopefully we'll make it into a real thread local to be used at the point of use instead of passing the argument), various observability things, etc. Here is a PoC that does that using NonBlockingSynchronizer and KafkaRequestHandler.wrap

46acf02

The NonBlockingSynchronizer replaces EventAccumulator and MultiThreadedEventProcessor (I didn't remove them to keep the change small), it has some perf benefits e.g. in uncontended cases, the processing continues running on the request thread instead of being rescheduled on the gc thread pool. I can also easily implement read-write synchronization for the NonBlockingSynchronizer (so that readers won't block each other out), e.g. to implement non-blocking read "lock" on group when committing offsets.

It's not to say I don't like the current code, but it feels like we re-building functionality that we already have elsewhere in Kafka and we we could re-use the existing building blocks so that the gc focuses on group coordination rather than managing thread pools, getting into the details of transactional protocol, etc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, if we implement proper concurrency granularity for groups (serialize group updates [not whole partition], keep read "lock" on groups during commit updates) I'm not sure if we'd get much extra perf gain from piercing the appendRecords abstraction to implement pipelining. Then we could get rid of the timeline snapshot structure and hooking into replication pipeline to listen for HWM updates; we could just do appendRecords and wait for completion. Then we could completely decouple group coordinator logic from the storage stack and make it simpler.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential issue is the ordering. withActiveContextOrThrow holds a partition level lock to make sure the record is replayed in the state machine in the same order as it's appended to the log. With withActiveContextOrThrowAsync, we hold the lock to replay the record, but appends to the log without the lock. The could create a situation that the state machine may not be exactly recreated by replaying records from the log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locking model is not changed -- it holds the lock around the whole call, see line 1241

result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock());

the .whenComplete callback will execute after the function is complete, so lock is held around the whole thing.

The unlock in the finally clause is so that if we asyncFunc.apply throws an exception (which would happen if the function in fact is executed synchronously) and we didn't get the future, then we unlock inline.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Artem. Got it.

} catch (Throwable t) {
complete(t);
}
}

/**
* Called by the CoordinatorEventProcessor when the event is executed.
*/
@Override
public CompletableFuture<Void> runAsync() {
try {
// Get the context of the coordinator or fail if the coordinator is not in active state.
withActiveContextOrThrow(tp, context -> {
return withActiveContextOrThrowAsync(tp, context -> {
long prevLastWrittenOffset = context.lastWrittenOffset;

// Execute the operation.
Expand All @@ -677,6 +690,7 @@ public void run() {
} else {
complete(null);
}
return CompletableFuture.completedFuture(null);
} else {
// If the records are not empty, first, they are applied to the state machine,
// second, then are written to the partition/log, and finally, the response
Expand All @@ -689,23 +703,31 @@ public void run() {

// Write the records to the log and update the last written
// offset.
long offset = partitionWriter.append(tp, result.records());
context.updateLastWrittenOffset(offset);

// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
} else {
complete(null);
}
return partitionWriter.appendAsync(tp, result.records()).thenAccept(offset -> {
context.updateLastWrittenOffset(offset);

// Add the response to the deferred queue.
if (!future.isDone()) {
context.deferredEventQueue.add(offset, this);
} else {
complete(null);
}
}).whenComplete((none, t) -> {
if (t != null) {
context.revertLastWrittenOffset(prevLastWrittenOffset);
complete(t);
}
});
} catch (Throwable t) {
context.revertLastWrittenOffset(prevLastWrittenOffset);
complete(t);
return CompletableFuture.completedFuture(null);
}
}
});
} catch (Throwable t) {
complete(t);
return CompletableFuture.completedFuture(null);
}
}

Expand Down Expand Up @@ -1195,6 +1217,40 @@ private void withActiveContextOrThrow(
}
}

/**
* Calls the provided function with the context iff the context is active; throws
* an exception otherwise. This method ensures that the context lock is acquired
* before calling the function and releases afterwards.
*
* @param tp The topic partition.
* @param asyncFunc The function that will receive the context.
* @throws NotCoordinatorException
* @throws CoordinatorLoadInProgressException
*/
private CompletableFuture<Void> withActiveContextOrThrowAsync(
TopicPartition tp,
Function<CoordinatorContext, CompletableFuture<Void>> asyncFunc
) throws NotCoordinatorException, CoordinatorLoadInProgressException {
CoordinatorContext context = contextOrThrow(tp);

CompletableFuture<Void> result = null;
try {
context.lock.lock();
if (context.state == CoordinatorState.ACTIVE) {
// TODO: it's not a good practice to hold lock around async calls, should use event accumulator to synchronize
result = asyncFunc.apply(context).whenComplete((none, t) -> context.lock.unlock());
return result;
} else if (context.state == CoordinatorState.LOADING) {
throw Errors.COORDINATOR_LOAD_IN_PROGRESS.exception();
} else {
throw Errors.NOT_COORDINATOR.exception();
}
} finally {
if (result == null)
context.lock.unlock();
}
}

/**
* Schedules a write operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -131,17 +132,34 @@ private void handleEvents() {
CoordinatorEvent event = accumulator.poll();
recordPollEndTime(time.milliseconds());
if (event != null) {
CompletableFuture<Void> future = null;
try {
log.debug("Executing event: {}.", event);
long dequeuedTimeMs = time.milliseconds();
metrics.recordEventQueueTime(dequeuedTimeMs - event.createdTimeMs());
event.run();
metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs);
future = event.runAsync();
future.whenComplete((none, t) -> {
try {
metrics.recordEventQueueProcessingTime(time.milliseconds() - dequeuedTimeMs);
if (t != null) {
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
}
} finally {
accumulator.done(event);
}
});
} catch (Throwable t) {
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
if (future == null) {
// We failed before we managed to create the future, so do this inline.
log.error("Failed to run event {} due to: {}.", event, t.getMessage(), t);
event.complete(t);
}
} finally {
accumulator.done(event);
if (future == null) {
// We failed before we managed to create the future, so do this inline.
accumulator.done(event);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.TopicPartition;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
* A simple interface to write records to Partitions/Logs. It contains the minimum
Expand Down Expand Up @@ -92,4 +94,20 @@ long append(
TopicPartition tp,
List<T> records
) throws KafkaException;

/**
* Write records to the partitions. Records are written in one batch so
* atomicity is guaranteed.
*
* @param tp The partition to write records to.
* @param records The list of records. The records are written in a single batch.
* @return The future log end offset right after the written records.
* @throws KafkaException Any KafkaException caught during the write operation.
*/
default CompletableFuture<Long> appendAsync(
TopicPartition tp,
List<T> records
) throws KafkaException {
return CompletableFuture.completedFuture(append(tp, records));
}
}