From 948c993fd50d8e1d1101c1f9d744bbcd776b59f6 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Thu, 11 May 2023 16:24:30 -0400 Subject: [PATCH 1/4] add purgatory interfaces --- .../group/runtime/PurgatoryScheduler.java | 49 +++++++++++++++++++ .../group/runtime/RebalancePurgatory.java | 40 +++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java new file mode 100644 index 0000000000000..ee893239145c1 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.concurrent.CompletableFuture; + +/** + * A scheduler to add and delete operations from the purgatory. + */ +@InterfaceStability.Unstable +public interface PurgatoryScheduler { + + /** + * Add an operation to the purgatory. + * + * @param key The key to identify this operation. + * @param deadlineMs The deadline in milliseconds. + * @param operation The operation to perform. + */ + void add(String key, long deadlineMs, CompletableFuture operation); + + /** + * Remove an operation with the given key. + * + * @param key The key. + */ + void remove(String key); + + /** + * Shut down the scheduler. + */ + void shutdown(); +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java new file mode 100644 index 0000000000000..2cec11228094b --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * An interface to complete and expire operations from the purgatory. + */ +@InterfaceStability.Unstable +public interface RebalancePurgatory extends PurgatoryScheduler { + + /** + * Complete the operation corresponding to the given key. + * + * @param key The key. + */ + void complete(String key); + + /** + * Expire the operation corresponding to the given key. + * + * @param key The key. + */ + void expire(String key); +} From 403760f65bdc00b60eebde9c47eefa6903fddde2 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Mon, 15 May 2023 10:06:33 -0400 Subject: [PATCH 2/4] address comments --- .../group/runtime/RebalancePurgatory.java | 40 ------------------- .../{PurgatoryScheduler.java => Timer.java} | 22 ++++------ 2 files changed, 8 insertions(+), 54 deletions(-) delete mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java rename group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/{PurgatoryScheduler.java => Timer.java} (70%) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java deleted file mode 100644 index 2cec11228094b..0000000000000 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/RebalancePurgatory.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.coordinator.group.runtime; - -import org.apache.kafka.common.annotation.InterfaceStability; - -/** - * An interface to complete and expire operations from the purgatory. - */ -@InterfaceStability.Unstable -public interface RebalancePurgatory extends PurgatoryScheduler { - - /** - * Complete the operation corresponding to the given key. - * - * @param key The key. - */ - void complete(String key); - - /** - * Expire the operation corresponding to the given key. - * - * @param key The key. - */ - void expire(String key); -} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java similarity index 70% rename from group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java rename to group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java index ee893239145c1..0226d386e0221 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PurgatoryScheduler.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java @@ -18,32 +18,26 @@ import org.apache.kafka.common.annotation.InterfaceStability; -import java.util.concurrent.CompletableFuture; - /** - * A scheduler to add and delete operations from the purgatory. + * An interface to schedule and cancel operations. */ @InterfaceStability.Unstable -public interface PurgatoryScheduler { +public interface Timer { /** - * Add an operation to the purgatory. + * Add an operation to the timer. If an operation with the same key + * already exists, replace it with the new operation. * * @param key The key to identify this operation. - * @param deadlineMs The deadline in milliseconds. + * @param deadlineMs The deadline to expire the operation in milliseconds. * @param operation The operation to perform. */ - void add(String key, long deadlineMs, CompletableFuture operation); + void schedule(String key, long deadlineMs, Runnable operation); /** - * Remove an operation with the given key. + * Remove an operation corresponding to a given key. * * @param key The key. */ - void remove(String key); - - /** - * Shut down the scheduler. - */ - void shutdown(); + void cancel(String key); } From c30052995fb41802227fe8e5ad43e6b718df4b31 Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Mon, 22 May 2023 12:06:14 -0400 Subject: [PATCH 3/4] address comments --- .../apache/kafka/coordinator/group/runtime/Timer.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java index 0226d386e0221..d423f8ea99ed3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java @@ -16,12 +16,11 @@ */ package org.apache.kafka.coordinator.group.runtime; -import org.apache.kafka.common.annotation.InterfaceStability; +import java.util.concurrent.TimeUnit; /** * An interface to schedule and cancel operations. */ -@InterfaceStability.Unstable public interface Timer { /** @@ -29,10 +28,10 @@ public interface Timer { * already exists, replace it with the new operation. * * @param key The key to identify this operation. - * @param deadlineMs The deadline to expire the operation in milliseconds. - * @param operation The operation to perform. + * @param delay The delay to wait before expiring. + * @param operation The operation to perform upon expiration. */ - void schedule(String key, long deadlineMs, Runnable operation); + void schedule(String key, TimeUnit delay, Runnable operation); /** * Remove an operation corresponding to a given key. From 9be8926092af7f85807673a880a3048c666a79fe Mon Sep 17 00:00:00 2001 From: Jeff Kim Date: Mon, 22 May 2023 12:26:12 -0400 Subject: [PATCH 4/4] address comments --- .../java/org/apache/kafka/coordinator/group/runtime/Timer.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java index d423f8ea99ed3..f6aac982d11e6 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/Timer.java @@ -29,9 +29,10 @@ public interface Timer { * * @param key The key to identify this operation. * @param delay The delay to wait before expiring. + * @param unit The delay unit. * @param operation The operation to perform upon expiration. */ - void schedule(String key, TimeUnit delay, Runnable operation); + void schedule(String key, long delay, TimeUnit unit, Runnable operation); /** * Remove an operation corresponding to a given key.