diff --git a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java index db8ccbff19a..3efff7a2151 100644 --- a/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java +++ b/bookkeeper-benchmark/src/main/java/org/apache/bookkeeper/benchmark/BenchBookie.java @@ -31,6 +31,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; @@ -38,7 +39,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ByteBufList; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; @@ -165,7 +165,7 @@ public static void main(String[] args) eventLoop = new NioEventLoopGroup(); } - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + OrderedExecutor executor = OrderedExecutor.newBuilder() .name("BenchBookieClientScheduler") .numThreads(1) .build(); diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java new file mode 100644 index 00000000000..655d49de716 --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.common.util; + +import com.google.common.util.concurrent.ForwardingExecutorService; + +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Implements {@link ExecutorService} and allows limiting the number of tasks to + * be scheduled in the thread's queue. + */ +public class BoundedExecutorService extends ForwardingExecutorService { + private final BlockingQueue queue; + private final ThreadPoolExecutor thread; + private final int maxTasksInQueue; + + public BoundedExecutorService(ThreadPoolExecutor thread, int maxTasksInQueue) { + this.queue = thread.getQueue(); + this.thread = thread; + this.maxTasksInQueue = maxTasksInQueue; + } + + @Override + protected ExecutorService delegate() { + return this.thread; + } + + private void checkQueue(int numberOfTasks) { + if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) { + throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items"); + } + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + checkQueue(tasks.size()); + return super.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + checkQueue(tasks.size()); + return super.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + checkQueue(tasks.size()); + return super.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + checkQueue(tasks.size()); + return super.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + checkQueue(1); + super.execute(command); + } + + @Override + public Future submit(Callable task) { + checkQueue(1); + return super.submit(task); + } + + @Override + public Future submit(Runnable task) { + checkQueue(1); + return super.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + checkQueue(1); + return super.submit(task, result); + } +} diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java index 482ca189860..44c6f38d279 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java @@ -59,83 +59,83 @@ protected ListeningExecutorService delegate() { @Override public ListenableScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - this.checkQueue(); + this.checkQueue(1); return this.thread.schedule(command, delay, unit); } @Override public ListenableScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - this.checkQueue(); + this.checkQueue(1); return this.thread.schedule(callable, delay, unit); } @Override public ListenableScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - this.checkQueue(); + this.checkQueue(1); return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit); } @Override public ListenableScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - this.checkQueue(); + this.checkQueue(1); return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit); } @Override public ListenableFuture submit(Callable task) { - this.checkQueue(); + this.checkQueue(1); return super.submit(task); } @Override public ListenableFuture submit(Runnable task) { - this.checkQueue(); + this.checkQueue(1); return super.submit(task); } @Override public List> invokeAll(Collection> tasks) throws InterruptedException { - this.checkQueue(); + this.checkQueue(tasks.size()); return super.invokeAll(tasks); } @Override public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - this.checkQueue(); + this.checkQueue(tasks.size()); return super.invokeAll(tasks, timeout, unit); } @Override public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - this.checkQueue(); + this.checkQueue(tasks.size()); return super.invokeAny(tasks); } @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - this.checkQueue(); + this.checkQueue(tasks.size()); return super.invokeAny(tasks, timeout, unit); } @Override public ListenableFuture submit(Runnable task, T result) { - this.checkQueue(); + this.checkQueue(1); return super.submit(task, result); } @Override public void execute(Runnable command) { - this.checkQueue(); + this.checkQueue(1); super.execute(command); } - private void checkQueue() { - if (this.maxTasksInQueue > 0 && this.queue.size() >= this.maxTasksInQueue) { - throw new RejectedExecutionException("Queue at limit of " + this.maxTasksInQueue + " items"); + private void checkQueue(int numberOfTasks) { + if (maxTasksInQueue > 0 && (queue.size() + numberOfTasks) > maxTasksInQueue) { + throw new RejectedExecutionException("Queue at limit of " + maxTasksInQueue + " items"); } } diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java new file mode 100644 index 00000000000..0f634d0bb9e --- /dev/null +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedExecutor.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.common.util; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import io.netty.util.concurrent.DefaultThreadFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang.StringUtils; + +/** + * This class provides 2 things over the java {@link ExecutorService}. + * + *

1. It takes {@link SafeRunnable objects} instead of plain Runnable objects. + * This means that exceptions in scheduled tasks wont go unnoticed and will be + * logged. + * + *

2. It supports submitting tasks with an ordering key, so that tasks submitted + * with the same key will always be executed in order, but tasks across + * different keys can be unordered. This retains parallelism while retaining the + * basic amount of ordering we want (e.g. , per ledger handle). Ordering is + * achieved by hashing the key objects to threads by their {@link #hashCode()} + * method. + */ +@Slf4j +public class OrderedExecutor implements ExecutorService { + public static final int NO_TASK_LIMIT = -1; + protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1); + + final String name; + final ExecutorService threads[]; + final long threadIds[]; + final Random rand = new Random(); + final OpStatsLogger taskExecutionStats; + final OpStatsLogger taskPendingStats; + final boolean traceTaskExecution; + final long warnTimeMicroSec; + final int maxTasksInQueue; + + + public static Builder newBuilder() { + return new Builder(); + } + + /** + * A builder class for an OrderedExecutor. + */ + public static class Builder extends AbstractBuilder { + + @Override + public OrderedExecutor build() { + if (null == threadFactory) { + threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor"); + } + return new OrderedExecutor(name, numThreads, threadFactory, statsLogger, + traceTaskExecution, warnTimeMicroSec, maxTasksInQueue); + } + } + + /** + * Abstract builder class to build {@link OrderedScheduler}. + */ + public abstract static class AbstractBuilder { + protected String name = getClass().getSimpleName(); + protected int numThreads = Runtime.getRuntime().availableProcessors(); + protected ThreadFactory threadFactory = null; + protected StatsLogger statsLogger = NullStatsLogger.INSTANCE; + protected boolean traceTaskExecution = false; + protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT; + protected int maxTasksInQueue = NO_TASK_LIMIT; + + public AbstractBuilder name(String name) { + this.name = name; + return this; + } + + public AbstractBuilder numThreads(int num) { + this.numThreads = num; + return this; + } + + public AbstractBuilder maxTasksInQueue(int num) { + this.maxTasksInQueue = num; + return this; + } + + public AbstractBuilder threadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + + public AbstractBuilder statsLogger(StatsLogger statsLogger) { + this.statsLogger = statsLogger; + return this; + } + + public AbstractBuilder traceTaskExecution(boolean enabled) { + this.traceTaskExecution = enabled; + return this; + } + + public AbstractBuilder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) { + this.warnTimeMicroSec = warnTimeMicroSec; + return this; + } + + @SuppressWarnings("unchecked") + public T build() { + if (null == threadFactory) { + threadFactory = new DefaultThreadFactory(name); + } + return (T) new OrderedExecutor( + name, + numThreads, + threadFactory, + statsLogger, + traceTaskExecution, + warnTimeMicroSec, + maxTasksInQueue); + } + } + + /** + * Decorator class for a runnable that measure the execution time. + */ + protected class TimedRunnable implements Runnable { + final Runnable runnable; + final long initNanos; + + TimedRunnable(Runnable runnable) { + this.runnable = runnable; + this.initNanos = MathUtils.nowInNano(); + } + + @Override + public void run() { + taskPendingStats.registerSuccessfulEvent(MathUtils.elapsedNanos(initNanos), TimeUnit.NANOSECONDS); + long startNanos = MathUtils.nowInNano(); + this.runnable.run(); + long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos); + taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS); + if (elapsedMicroSec >= warnTimeMicroSec) { + log.warn("Runnable {}:{} took too long {} micros to execute.", runnable, runnable.getClass(), + elapsedMicroSec); + } + } + } + + protected ThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) { + return new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), factory); + } + + protected ExecutorService getBoundedExecutor(ThreadPoolExecutor executor) { + return new BoundedExecutorService(executor, this.maxTasksInQueue); + } + + /** + * Constructs Safe executor. + * + * @param numThreads + * - number of threads + * @param baseName + * - base name of executor threads + * @param threadFactory + * - for constructing threads + * @param statsLogger + * - for reporting executor stats + * @param traceTaskExecution + * - should we stat task execution + * @param warnTimeMicroSec + * - log long task exec warning after this interval + * @param maxTasksInQueue + * - maximum items allowed in a thread queue. -1 for no limit + */ + protected OrderedExecutor(String baseName, int numThreads, ThreadFactory threadFactory, + StatsLogger statsLogger, boolean traceTaskExecution, + long warnTimeMicroSec, int maxTasksInQueue) { + checkArgument(numThreads > 0); + checkArgument(!StringUtils.isBlank(baseName)); + + this.maxTasksInQueue = maxTasksInQueue; + this.warnTimeMicroSec = warnTimeMicroSec; + name = baseName; + threads = new ExecutorService[numThreads]; + threadIds = new long[numThreads]; + for (int i = 0; i < numThreads; i++) { + ThreadPoolExecutor thread = createSingleThreadExecutor( + new ThreadFactoryBuilder().setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d") + .setThreadFactory(threadFactory).build()); + threads[i] = getBoundedExecutor(thread); + + final int idx = i; + try { + threads[idx].submit(() -> { + threadIds[idx] = Thread.currentThread().getId(); + }).get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Couldn't start thread " + i, e); + } catch (ExecutionException e) { + throw new RuntimeException("Couldn't start thread " + i, e); + } + + // Register gauges + statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return thread.getQueue().size(); + } + }); + statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return thread.getCompletedTaskCount(); + } + }); + statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return thread.getTaskCount(); + } + }); + } + + // Stats + this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution"); + this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued"); + this.traceTaskExecution = traceTaskExecution; + } + + /** + * Schedules a one time action to execute with an ordering guarantee on the key. + * @param orderingKey + * @param r + */ + public void executeOrdered(Object orderingKey, SafeRunnable r) { + chooseThread(orderingKey).execute(timedRunnable(r)); + } + + /** + * Schedules a one time action to execute with an ordering guarantee on the key. + * @param orderingKey + * @param r + */ + public void executeOrdered(long orderingKey, SafeRunnable r) { + chooseThread(orderingKey).execute(timedRunnable(r)); + } + + /** + * Schedules a one time action to execute with an ordering guarantee on the key. + * @param orderingKey + * @param r + */ + public void executeOrdered(int orderingKey, SafeRunnable r) { + chooseThread(orderingKey).execute(timedRunnable(r)); + } + + public ListenableFuture submitOrdered(long orderingKey, Callable task) { + SettableFuture future = SettableFuture.create(); + executeOrdered(orderingKey, () -> { + try { + T result = task.call(); + future.set(result); + } catch (Throwable t) { + future.setException(t); + } + }); + + return future; + } + + + public long getThreadID(long orderingKey) { + // skip hashcode generation in this special case + if (threadIds.length == 1) { + return threadIds[0]; + } + + return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)]; + } + + public ExecutorService chooseThread() { + // skip random # generation in this special case + if (threads.length == 1) { + return threads[0]; + } + + return threads[rand.nextInt(threads.length)]; + } + + public ExecutorService chooseThread(Object orderingKey) { + // skip hashcode generation in this special case + if (threads.length == 1) { + return threads[0]; + } + + return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)]; + } + + /** + * skip hashcode generation in this special case. + * + * @param orderingKey long ordering key + * @return the thread for executing this order key + */ + public ExecutorService chooseThread(long orderingKey) { + if (threads.length == 1) { + return threads[0]; + } + + return threads[MathUtils.signSafeMod(orderingKey, threads.length)]; + } + + private Runnable timedRunnable(Runnable r) { + if (traceTaskExecution) { + return new TimedRunnable(r); + } else { + return r; + } + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Callable task) { + return chooseThread().submit(task); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Runnable task, T result) { + return chooseThread().submit(timedRunnable(task), result); + } + + /** + * {@inheritDoc} + */ + @Override + public Future submit(Runnable task) { + return chooseThread().submit(timedRunnable(task)); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(Collection> tasks) + throws InterruptedException { + return chooseThread().invokeAll(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public List> invokeAll(Collection> tasks, + long timeout, + TimeUnit unit) + throws InterruptedException { + return chooseThread().invokeAll(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(Collection> tasks) + throws InterruptedException, ExecutionException { + return chooseThread().invokeAny(tasks); + } + + /** + * {@inheritDoc} + */ + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + return chooseThread().invokeAny(tasks, timeout, unit); + } + + /** + * {@inheritDoc} + */ + @Override + public void execute(Runnable command) { + chooseThread().execute(command); + } + + + /** + * {@inheritDoc} + */ + @Override + public void shutdown() { + for (int i = 0; i < threads.length; i++) { + threads[i].shutdown(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List shutdownNow() { + List runnables = new ArrayList(); + for (ExecutorService executor : threads) { + runnables.addAll(executor.shutdownNow()); + } + return runnables; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isShutdown() { + for (ExecutorService executor : threads) { + if (!executor.isShutdown()) { + return false; + } + } + return true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + boolean ret = true; + for (int i = 0; i < threads.length; i++) { + ret = ret && threads[i].awaitTermination(timeout, unit); + } + return ret; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isTerminated() { + for (ExecutorService executor : threads) { + if (!executor.isTerminated()) { + return false; + } + } + return true; + } + + /** + * Force threads shutdown (cancel active requests) after specified delay, + * to be used after shutdown() rejects new requests. + */ + public void forceShutdown(long timeout, TimeUnit unit) { + for (int i = 0; i < threads.length; i++) { + try { + if (!threads[i].awaitTermination(timeout, unit)) { + threads[i].shutdownNow(); + } + } catch (InterruptedException exception) { + threads[i].shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java index bf2a6fbc8e1..6f05832b9d5 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/OrderedScheduler.java @@ -17,32 +17,20 @@ */ package org.apache.bookkeeper.common.util; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Random; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.OpStatsLogger; + import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.commons.lang.StringUtils; /** * This class provides 2 things over the java {@link ScheduledExecutorService}. @@ -58,19 +46,7 @@ * achieved by hashing the key objects to threads by their {@link #hashCode()} * method. */ -public class OrderedScheduler implements ScheduledExecutorService { - public static final int NO_TASK_LIMIT = -1; - protected static final long WARN_TIME_MICRO_SEC_DEFAULT = TimeUnit.SECONDS.toMicros(1); - - final String name; - final ListeningScheduledExecutorService threads[]; - final long threadIds[]; - final Random rand = new Random(); - final OpStatsLogger taskExecutionStats; - final OpStatsLogger taskPendingStats; - final boolean traceTaskExecution; - final long warnTimeMicroSec; - final int maxTasksInQueue; +public class OrderedScheduler extends OrderedExecutor implements ScheduledExecutorService { /** * Create a builder to build ordered scheduler. @@ -84,61 +60,13 @@ public static SchedulerBuilder newSchedulerBuilder() { /** * Builder to build ordered scheduler. */ - public static class SchedulerBuilder extends AbstractBuilder {} - - /** - * Abstract builder class to build {@link OrderedScheduler}. - */ - public abstract static class AbstractBuilder { - protected String name = getClass().getSimpleName(); - protected int numThreads = Runtime.getRuntime().availableProcessors(); - protected ThreadFactory threadFactory = null; - protected StatsLogger statsLogger = NullStatsLogger.INSTANCE; - protected boolean traceTaskExecution = false; - protected long warnTimeMicroSec = WARN_TIME_MICRO_SEC_DEFAULT; - protected int maxTasksInQueue = NO_TASK_LIMIT; - - public AbstractBuilder name(String name) { - this.name = name; - return this; - } - - public AbstractBuilder numThreads(int num) { - this.numThreads = num; - return this; - } - - public AbstractBuilder maxTasksInQueue(int num) { - this.maxTasksInQueue = num; - return this; - } - - public AbstractBuilder threadFactory(ThreadFactory threadFactory) { - this.threadFactory = threadFactory; - return this; - } - - public AbstractBuilder statsLogger(StatsLogger statsLogger) { - this.statsLogger = statsLogger; - return this; - } - - public AbstractBuilder traceTaskExecution(boolean enabled) { - this.traceTaskExecution = enabled; - return this; - } - - public AbstractBuilder traceTaskWarnTimeMicroSec(long warnTimeMicroSec) { - this.warnTimeMicroSec = warnTimeMicroSec; - return this; - } - - @SuppressWarnings("unchecked") - public T build() { + public static class SchedulerBuilder extends OrderedExecutor.AbstractBuilder { + @Override + public OrderedScheduler build() { if (null == threadFactory) { threadFactory = new DefaultThreadFactory(name); } - return (T) new OrderedScheduler( + return new OrderedScheduler( name, numThreads, threadFactory, @@ -147,32 +75,6 @@ public T build() { warnTimeMicroSec, maxTasksInQueue); } - - } - - private class TimedRunnable implements SafeRunnable { - final SafeRunnable runnable; - final long initNanos; - - TimedRunnable(SafeRunnable runnable) { - this.runnable = runnable; - this.initNanos = MathUtils.nowInNano(); - } - - @Override - public void safeRun() { - taskPendingStats.registerSuccessfulEvent( - MathUtils.elapsedNanos(initNanos), - TimeUnit.NANOSECONDS); - long startNanos = MathUtils.nowInNano(); - this.runnable.safeRun(); - long elapsedMicroSec = MathUtils.elapsedMicroSec(startNanos); - taskExecutionStats.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS); - if (elapsedMicroSec >= warnTimeMicroSec) { - LOGGER.warn("Runnable {}:{} took too long {} micros to execute.", - runnable, runnable.getClass(), elapsedMicroSec); - } - } } /** @@ -191,172 +93,40 @@ public void safeRun() { * @param warnTimeMicroSec * - log long task exec warning after this interval */ - protected OrderedScheduler(String baseName, + private OrderedScheduler(String baseName, int numThreads, ThreadFactory threadFactory, StatsLogger statsLogger, boolean traceTaskExecution, long warnTimeMicroSec, int maxTasksInQueue) { - checkArgument(numThreads > 0); - checkArgument(!StringUtils.isBlank(baseName)); - - this.maxTasksInQueue = maxTasksInQueue; - this.warnTimeMicroSec = warnTimeMicroSec; - name = baseName; - threads = new ListeningScheduledExecutorService[numThreads]; - threadIds = new long[numThreads]; - for (int i = 0; i < numThreads; i++) { - final ScheduledThreadPoolExecutor thread = new ScheduledThreadPoolExecutor(1, - new ThreadFactoryBuilder() - .setNameFormat(name + "-" + getClass().getSimpleName() + "-" + i + "-%d") - .setThreadFactory(threadFactory) - .build()); - threads[i] = new BoundedScheduledExecutorService(thread, this.maxTasksInQueue); - - final int idx = i; - try { - threads[idx].submit(new SafeRunnable() { - @Override - public void safeRun() { - threadIds[idx] = Thread.currentThread().getId(); - } - }).get(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException("Couldn't start thread " + i, e); - } catch (ExecutionException e) { - throw new RuntimeException("Couldn't start thread " + i, e); - } - - // Register gauges - statsLogger.registerGauge(String.format("%s-queue-%d", name, idx), new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return thread.getQueue().size(); - } - }); - statsLogger.registerGauge(String.format("%s-completed-tasks-%d", name, idx), new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } + super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue); + } - @Override - public Number getSample() { - return thread.getCompletedTaskCount(); - } - }); - statsLogger.registerGauge(String.format("%s-total-tasks-%d", name, idx), new Gauge() { - @Override - public Number getDefaultValue() { - return 0; - } - @Override - public Number getSample() { - return thread.getTaskCount(); - } - }); - } + @Override + protected ScheduledThreadPoolExecutor createSingleThreadExecutor(ThreadFactory factory) { + return new ScheduledThreadPoolExecutor(1, factory); + } - // Stats - this.taskExecutionStats = statsLogger.scope(name).getOpStatsLogger("task_execution"); - this.taskPendingStats = statsLogger.scope(name).getOpStatsLogger("task_queued"); - this.traceTaskExecution = traceTaskExecution; + @Override + protected ListeningScheduledExecutorService getBoundedExecutor(ThreadPoolExecutor executor) { + return new BoundedScheduledExecutorService((ScheduledThreadPoolExecutor) executor, this.maxTasksInQueue); } + @Override public ListeningScheduledExecutorService chooseThread() { - // skip random # generation in this special case - if (threads.length == 1) { - return threads[0]; - } - - return threads[rand.nextInt(threads.length)]; + return (ListeningScheduledExecutorService) super.chooseThread(); } + @Override public ListeningScheduledExecutorService chooseThread(Object orderingKey) { - // skip hashcode generation in this special case - if (threads.length == 1) { - return threads[0]; - } - - return threads[MathUtils.signSafeMod(orderingKey.hashCode(), threads.length)]; + return (ListeningScheduledExecutorService) super.chooseThread(orderingKey); } - /** - * skip hashcode generation in this special case. - * - * @param orderingKey long ordering key - * @return the thread for executing this order key - */ + @Override public ListeningScheduledExecutorService chooseThread(long orderingKey) { - if (threads.length == 1) { - return threads[0]; - } - - return threads[MathUtils.signSafeMod(orderingKey, threads.length)]; - } - - private SafeRunnable timedRunnable(SafeRunnable r) { - if (traceTaskExecution) { - return new TimedRunnable(r); - } else { - return r; - } - } - - /** - * schedules a one time action to execute. - */ - public void submit(SafeRunnable r) { - chooseThread().submit(timedRunnable(r)); - } - - /** - * schedules a one time action to execute with an ordering guarantee on the orderingKey. - * - * @param orderingKey order key to submit the task - * @param r task to run - * @return listenable future on the completion of the task - */ - public ListenableFuture submitOrdered(Object orderingKey, SafeRunnable r) { - return chooseThread(orderingKey).submit(timedRunnable(r)); - } - - /** - * schedules a one time action to execute with an ordering guarantee on the orderingKey. - * - * @param orderingKey order key to submit the task - * @param r task to run - */ - public void executeOrdered(Object orderingKey, SafeRunnable r) { - chooseThread(orderingKey).execute(timedRunnable(r)); - } - - /** - * schedules a one time action to execute with an ordering guarantee on the key. - * - * @param orderingKey - * @param r - */ - public void submitOrdered(long orderingKey, SafeRunnable r) { - chooseThread(orderingKey).execute(timedRunnable(r)); - } - - /** - * schedules a one time action to execute with an ordering guarantee on the key. - * - * @param orderingKey - * @param r - */ - public void submitOrdered(int orderingKey, SafeRunnable r) { - chooseThread(orderingKey).execute(timedRunnable(r)); + return (ListeningScheduledExecutorService) super.chooseThread(orderingKey); } /** @@ -472,91 +242,6 @@ public ScheduledFuture scheduleWithFixedDelayOrdered(Object orderingKey, Safe return chooseThread(orderingKey).scheduleWithFixedDelay(command, initialDelay, delay, unit); } - protected long getThreadID(long orderingKey) { - // skip hashcode generation in this special case - if (threadIds.length == 1) { - return threadIds[0]; - } - - return threadIds[MathUtils.signSafeMod(orderingKey, threadIds.length)]; - } - - /** - * {@inheritDoc} - */ - @Override - public void shutdown() { - for (int i = 0; i < threads.length; i++) { - threads[i].shutdown(); - } - } - - /** - * {@inheritDoc} - */ - @Override - public List shutdownNow() { - List runnables = new ArrayList(); - for (ScheduledExecutorService executor : threads) { - runnables.addAll(executor.shutdownNow()); - } - return runnables; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isShutdown() { - for (ScheduledExecutorService executor : threads) { - if (!executor.isShutdown()) { - return false; - } - } - return true; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { - boolean ret = true; - for (int i = 0; i < threads.length; i++) { - ret = ret && threads[i].awaitTermination(timeout, unit); - } - return ret; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean isTerminated() { - for (ScheduledExecutorService executor : threads) { - if (!executor.isTerminated()) { - return false; - } - } - return true; - } - - /** - * Force threads shutdown (cancel active requests) after specified delay, - * to be used after shutdown() rejects new requests. - */ - public void forceShutdown(long timeout, TimeUnit unit) { - for (int i = 0; i < threads.length; i++) { - try { - if (!threads[i].awaitTermination(timeout, unit)) { - threads[i].shutdownNow(); - } - } catch (InterruptedException exception) { - threads[i].shutdownNow(); - Thread.currentThread().interrupt(); - } - } - } // // Methods for implementing {@link ScheduledExecutorService} @@ -596,74 +281,4 @@ public ScheduledFuture scheduleWithFixedDelay(Runnable command, return chooseThread().scheduleWithFixedDelay(command, initialDelay, delay, unit); } - /** - * {@inheritDoc} - */ - @Override - public Future submit(Callable task) { - return chooseThread().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public Future submit(Runnable task, T result) { - return chooseThread().submit(task, result); - } - - /** - * {@inheritDoc} - */ - @Override - public Future submit(Runnable task) { - return chooseThread().submit(task); - } - - /** - * {@inheritDoc} - */ - @Override - public List> invokeAll(Collection> tasks) - throws InterruptedException { - return chooseThread().invokeAll(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public List> invokeAll(Collection> tasks, - long timeout, - TimeUnit unit) - throws InterruptedException { - return chooseThread().invokeAll(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public T invokeAny(Collection> tasks) - throws InterruptedException, ExecutionException { - return chooseThread().invokeAny(tasks); - } - - /** - * {@inheritDoc} - */ - @Override - public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) - throws InterruptedException, ExecutionException, TimeoutException { - return chooseThread().invokeAny(tasks, timeout, unit); - } - - /** - * {@inheritDoc} - */ - @Override - public void execute(Runnable command) { - chooseThread().execute(command); - } - } diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java index 89214e3023f..6a52ef597c1 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Retries.java @@ -94,7 +94,7 @@ public static CompletableFuture run( scheduler, null); } else { - scheduler.submitOrdered(key, () -> { + scheduler.executeOrdered(key, () -> { execute( future, backoffs.iterator(), diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java index 546a8bafaae..d5f6c8dcd80 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java @@ -88,6 +88,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.LedgerMetadata; import org.apache.bookkeeper.client.UpdateLedgerOp; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.discover.RegistrationManager; @@ -112,7 +113,6 @@ import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.LedgerIdFormatter; import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.Tool; import org.apache.bookkeeper.versioning.Version; import org.apache.bookkeeper.versioning.Versioned; @@ -814,7 +814,7 @@ int runCmd(CommandLine cmdLine) throws Exception { } else { // Use BookieClient to target a specific bookie EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + OrderedExecutor executor = OrderedExecutor.newBuilder() .numThreads(1) .name("BookieClientScheduler") .build(); @@ -1022,6 +1022,7 @@ long getLedgerId() { return ledgerId; } + @Override public void operationComplete(int rc, LedgerMetadata result) { if (rc != 0) { setException(BKException.create(rc)); @@ -2489,6 +2490,7 @@ String getDescription() { return "Convert bookie indexes from InterleavedStorage to DbLedgerStorage format"; } + @Override String getUsage() { return CMD_CONVERT_TO_DB_STORAGE; } @@ -2584,6 +2586,7 @@ String getDescription() { return "Convert bookie indexes from DbLedgerStorage to InterleavedStorage format"; } + @Override String getUsage() { return CMD_CONVERT_TO_INTERLEAVED_STORAGE; } @@ -2699,6 +2702,7 @@ String getDescription() { return "Rebuild DbLedgerStorage locations index by scanning the entry logs"; } + @Override String getUsage() { return CMD_REBUILD_DB_LEDGER_LOCATIONS_INDEX; } @@ -3116,6 +3120,7 @@ public void remove() { }; return new Iterable>() { + @Override public Iterator> iterator() { return iterator; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index f0c524998ea..98626657fcb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -57,6 +57,8 @@ import org.apache.bookkeeper.client.api.DeleteBuilder; import org.apache.bookkeeper.client.api.OpenBuilder; import org.apache.bookkeeper.client.api.WriteFlag; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.AbstractConfiguration; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.Feature; @@ -79,7 +81,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.ReflectionUtils; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.commons.configuration.ConfigurationException; @@ -134,8 +135,8 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper { final BookieClient bookieClient; final BookieWatcher bookieWatcher; - final OrderedSafeExecutor mainWorkerPool; - final ScheduledExecutorService scheduler; + final OrderedExecutor mainWorkerPool; + final OrderedScheduler scheduler; final HashedWheelTimer requestTimer; final boolean ownTimer; final FeatureProvider featureProvider; @@ -421,9 +422,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo this.reorderReadSequence = conf.isReorderReadSequenceEnabled(); // initialize resources - this.scheduler = Executors - .newSingleThreadScheduledExecutor(new DefaultThreadFactory("BookKeeperClientScheduler")); - this.mainWorkerPool = OrderedSafeExecutor.newBuilder() + this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build(); + this.mainWorkerPool = OrderedExecutor.newBuilder() .name("BookKeeperClientWorker") .numThreads(conf.getNumWorkerThreads()) .statsLogger(statsLogger) @@ -672,12 +672,12 @@ BookieWatcher getBookieWatcher() { } @VisibleForTesting - OrderedSafeExecutor getMainWorkerPool() { + OrderedExecutor getMainWorkerPool() { return mainWorkerPool; } @VisibleForTesting - ScheduledExecutorService getScheduler() { + OrderedScheduler getScheduler() { return scheduler; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java index c77843a6c78..96c8998db4e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ExplicitLacFlushPolicy.java @@ -116,7 +116,7 @@ public String toString() { } }; try { - scheduledFuture = lh.bk.getMainWorkerPool().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask, + scheduledFuture = lh.bk.getScheduler().scheduleAtFixedRateOrdered(lh.getId(), updateLacTask, explicitLacIntervalInMs, explicitLacIntervalInMs, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException re) { LOG.error("Scheduling of ExplictLastAddConfirmedFlush for ledger: {} has failed because of {}", diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java index 117e359894f..45ec4255a9a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerDeleteOp.java @@ -30,7 +30,7 @@ import org.apache.bookkeeper.client.api.DeleteBuilder; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; +import org.apache.bookkeeper.util.OrderedGenericCallback; import org.apache.bookkeeper.versioning.Version; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +39,7 @@ * Encapsulates asynchronous ledger delete operation. * */ -class LedgerDeleteOp extends OrderedSafeGenericCallback { +class LedgerDeleteOp extends OrderedGenericCallback { static final Logger LOG = LoggerFactory.getLogger(LedgerDeleteOp.class); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index 5c2ff8c6dd7..3f599e28ada 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -46,7 +46,7 @@ import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.ByteBufList; -import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; +import org.apache.bookkeeper.util.OrderedGenericCallback; import org.apache.zookeeper.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -416,7 +416,7 @@ public void operationComplete(int rc, Void result) { // try again, the previous success (with which this has // conflicted) will have updated the stat other operations // such as (addEnsemble) would update it too. - lh.rereadMetadata(new OrderedSafeGenericCallback( + lh.rereadMetadata(new OrderedGenericCallback( lh.bk.mainWorkerPool, lh.getId()) { @Override public void safeOperationComplete(int rc, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 8db56673987..a9a85bc0360 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -83,7 +83,7 @@ import org.apache.bookkeeper.proto.checksum.MacDigestManager; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; +import org.apache.bookkeeper.util.OrderedGenericCallback; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.commons.collections4.IteratorUtils; import org.slf4j.Logger; @@ -176,6 +176,7 @@ public class LedgerHandle implements WriteHandle { this.bookieFailureHistory = CacheBuilder.newBuilder() .expireAfterWrite(bk.getConf().getBookieFailureHistoryExpirationMSec(), TimeUnit.MILLISECONDS) .build(new CacheLoader() { + @Override public Long load(BookieSocketAddress key) { return -1L; } @@ -200,10 +201,12 @@ public long getBookiePendingRequests(BookieSocketAddress bookieSocketAddress) { lacUpdateMissesCounter = bk.getStatsLogger().getCounter(BookKeeperClientStats.LAC_UPDATE_MISSES); bk.getStatsLogger().registerGauge(BookKeeperClientStats.PENDING_ADDS, new Gauge() { - public Integer getDefaultValue() { + @Override + public Integer getDefaultValue() { return 0; } - public Integer getSample() { + @Override + public Integer getSample() { return pendingAddOps.size(); } }); @@ -240,6 +243,7 @@ protected void initializeExplicitLacFlushPolicy() { * * @return the id of the ledger */ + @Override public long getId() { return ledgerId; } @@ -344,6 +348,7 @@ synchronized long addToLength(long delta) { * * @return the length of the ledger in bytes */ + @Override public synchronized long getLength() { return this.length; } @@ -453,7 +458,7 @@ void asyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) * @param rc */ void doAsyncCloseInternal(final CloseCallback cb, final Object ctx, final int rc) { - bk.getMainWorkerPool().submitOrdered(ledgerId, new SafeRunnable() { + bk.getMainWorkerPool().executeOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { final long prevLastEntryId; @@ -512,7 +517,7 @@ public void safeRun() { + metadata.getLastEntryId() + " with this many bytes: " + metadata.getLength()); } - final class CloseCb extends OrderedSafeGenericCallback { + final class CloseCb extends OrderedGenericCallback { CloseCb() { super(bk.getMainWorkerPool(), ledgerId); } @@ -520,7 +525,7 @@ final class CloseCb extends OrderedSafeGenericCallback { @Override public void safeOperationComplete(final int rc, Void result) { if (rc == BKException.Code.MetadataVersionException) { - rereadMetadata(new OrderedSafeGenericCallback(bk.getMainWorkerPool(), + rereadMetadata(new OrderedGenericCallback(bk.getMainWorkerPool(), ledgerId) { @Override public void safeOperationComplete(int newrc, LedgerMetadata newMeta) { @@ -830,7 +835,7 @@ CompletableFuture readEntriesInternalAsync(long firstEntry, boolean isRecoveryRead) { PendingReadOp op = new PendingReadOp(this, bk.getScheduler(), firstEntry, lastEntry, isRecoveryRead); if (!bk.isClosed()) { - bk.getMainWorkerPool().submitOrdered(ledgerId, op); + bk.getMainWorkerPool().executeOrdered(ledgerId, op); } else { op.future().completeExceptionally(BKException.create(ClientClosedException)); } @@ -1099,7 +1104,7 @@ public String toString() { } try { - bk.getMainWorkerPool().submitOrdered(ledgerId, op); + bk.getMainWorkerPool().executeOrdered(ledgerId, op); } catch (RejectedExecutionException e) { op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException), LedgerHandle.this, INVALID_ENTRY_ID, 0, op.ctx); @@ -1689,7 +1694,7 @@ public String toString() { * reformed ensemble. On MetadataVersionException, will reread latest * ledgerMetadata and act upon. */ - private final class ChangeEnsembleCb extends OrderedSafeGenericCallback { + private final class ChangeEnsembleCb extends OrderedGenericCallback { private final EnsembleInfo ensembleInfo; private final int curBlockAddCompletions; private final int ensembleChangeIdx; @@ -1749,7 +1754,7 @@ public String toString() { * Callback which is reading the ledgerMetadata present in zk. This will try * to resolve the version conflicts. */ - private final class ReReadLedgerMetadataCb extends OrderedSafeGenericCallback { + private final class ReReadLedgerMetadataCb extends OrderedGenericCallback { private final int rc; private final EnsembleInfo ensembleInfo; private final int curBlockAddCompletions; @@ -2002,11 +2007,11 @@ void recover(GenericCallback finalCb, return; } - writeLedgerConfig(new OrderedSafeGenericCallback(bk.getMainWorkerPool(), ledgerId) { + writeLedgerConfig(new OrderedGenericCallback(bk.getMainWorkerPool(), ledgerId) { @Override public void safeOperationComplete(final int rc, Void result) { if (rc == BKException.Code.MetadataVersionException) { - rereadMetadata(new OrderedSafeGenericCallback(bk.getMainWorkerPool(), + rereadMetadata(new OrderedGenericCallback(bk.getMainWorkerPool(), ledgerId) { @Override public void safeOperationComplete(int rc, LedgerMetadata newMeta) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java index afd56cf2ee3..0eaf0b5d3ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandleAdv.java @@ -238,7 +238,7 @@ public String toString() { } try { - bk.getMainWorkerPool().submitOrdered(ledgerId, op); + bk.getMainWorkerPool().executeOrdered(ledgerId, op); } catch (RejectedExecutionException e) { op.cb.addCompleteWithLatency(bk.getReturnRc(BKException.Code.InterruptedException), LedgerHandleAdv.this, op.getEntryId(), 0, op.ctx); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java index 16b577b652a..33cfaf266b2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerOpenOp.java @@ -38,7 +38,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor.OrderedSafeGenericCallback; +import org.apache.bookkeeper.util.OrderedGenericCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -183,7 +183,7 @@ public void operationComplete(int rc, LedgerMetadata metadata) { } if (doRecovery) { - lh.recover(new OrderedSafeGenericCallback(bk.getMainWorkerPool(), ledgerId) { + lh.recover(new OrderedGenericCallback(bk.getMainWorkerPool(), ledgerId) { @Override public void safeOperationComplete(int rc, Void result) { if (rc == BKException.Code.OK) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java index dd5058ef091..eb957663350 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java @@ -143,7 +143,7 @@ boolean maybeTimeout() { void timeoutQuorumWait() { try { - lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, new SafeRunnable() { + lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, new SafeRunnable() { @Override public void safeRun() { if (completed) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 484e9c734aa..a41207d5357 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -505,7 +505,7 @@ PendingReadOp parallelRead(boolean enabled) { } public void submit() { - lh.bk.getMainWorkerPool().submitOrdered(lh.ledgerId, this); + lh.bk.getMainWorkerPool().executeOrdered(lh.ledgerId, this); } void initiate() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java index 9d62f72d75a..eef0e70eac2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOnlyLedgerHandle.java @@ -163,7 +163,7 @@ public void onChanged(long lid, LedgerMetadata newMetadata) { } if (Version.Occurred.BEFORE == occurred) { // the metadata is updated try { - bk.getMainWorkerPool().submitOrdered(ledgerId, new MetadataUpdater(newMetadata)); + bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataUpdater(newMetadata)); } catch (RejectedExecutionException ree) { LOG.error("Failed on submitting updater to update ledger metadata on ledger {} : {}", ledgerId, newMetadata); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java index 285a06aa745..c160b204ae6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.SafeRunnable; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -63,7 +64,6 @@ import org.apache.bookkeeper.tls.SecurityException; import org.apache.bookkeeper.tls.SecurityHandlerFactory; import org.apache.bookkeeper.util.ByteBufList; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -77,7 +77,7 @@ public class BookieClient implements PerChannelBookieClientFactory { // This is global state that should be across all BookieClients AtomicLong totalBytesOutstanding = new AtomicLong(); - OrderedSafeExecutor executor; + OrderedExecutor executor; ScheduledExecutorService scheduler; ScheduledFuture timeoutFuture; @@ -97,7 +97,7 @@ public class BookieClient implements PerChannelBookieClientFactory { private final long bookieErrorThresholdPerInterval; public BookieClient(ClientConfiguration conf, EventLoopGroup eventLoopGroup, - OrderedSafeExecutor executor, ScheduledExecutorService scheduler, + OrderedExecutor executor, ScheduledExecutorService scheduler, StatsLogger statsLogger) throws IOException { this.conf = conf; this.eventLoopGroup = eventLoopGroup; @@ -198,7 +198,7 @@ public void writeLac(final BookieSocketAddress addr, final long ledgerId, final client.obtain((rc, pcbc) -> { if (rc != BKException.Code.OK) { try { - executor.submitOrdered(ledgerId, safeRun(() -> { + executor.executeOrdered(ledgerId, safeRun(() -> { cb.writeLacComplete(rc, ledgerId, addr, ctx); })); } catch (RejectedExecutionException re) { @@ -219,7 +219,7 @@ private void completeAdd(final int rc, final WriteCallback cb, final Object ctx) { try { - executor.submitOrdered(ledgerId, new SafeRunnable() { + executor.executeOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { cb.writeComplete(rc, ledgerId, entryId, addr, ctx); @@ -266,7 +266,7 @@ private void completeRead(final int rc, final ReadEntryCallback cb, final Object ctx) { try { - executor.submitOrdered(ledgerId, new SafeRunnable() { + executor.executeOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { cb.readEntryComplete(rc, ledgerId, entryId, entry, ctx); @@ -362,7 +362,7 @@ public void readLac(final BookieSocketAddress addr, final long ledgerId, final R client.obtain((rc, pcbc) -> { if (rc != BKException.Code.OK) { try { - executor.submitOrdered(ledgerId, safeRun(() -> { + executor.executeOrdered(ledgerId, safeRun(() -> { cb.readLacComplete(rc, ledgerId, null, null, ctx); })); } catch (RejectedExecutionException re) { @@ -526,7 +526,7 @@ public void writeComplete(int rc, long ledger, long entry, BookieSocketAddress a byte hello[] = "hello".getBytes(UTF_8); long ledger = Long.parseLong(args[2]); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder() + OrderedExecutor executor = OrderedExecutor.newBuilder() .name("BookieClientWorker") .numThreads(1) .build(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java index 94a43a956ed..7e65ced11a7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java @@ -59,7 +59,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.AuthToken; import org.apache.bookkeeper.bookie.Bookie; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.processor.RequestProcessor; import org.apache.bookkeeper.stats.Counter; @@ -68,7 +68,6 @@ import org.apache.bookkeeper.tls.SecurityException; import org.apache.bookkeeper.tls.SecurityHandlerFactory; import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -93,12 +92,12 @@ public class BookieRequestProcessor implements RequestProcessor { /** * The threadpool used to execute all read entry requests issued to this server. */ - private final OrderedSafeExecutor readThreadPool; + private final OrderedExecutor readThreadPool; /** * The threadpool used to execute all add entry requests issued to this server. */ - private final OrderedSafeExecutor writeThreadPool; + private final OrderedExecutor writeThreadPool; /** * TLS management. @@ -109,12 +108,12 @@ public class BookieRequestProcessor implements RequestProcessor { * The threadpool used to execute all long poll requests issued to this server * after they are done waiting. */ - private final OrderedSafeExecutor longPollThreadPool; + private final OrderedExecutor longPollThreadPool; /** * The threadpool used to execute high priority requests. */ - private final OrderedSafeExecutor highPriorityThreadPool; + private final OrderedExecutor highPriorityThreadPool; /** * The Timer used to time out requests for long polling. @@ -162,11 +161,11 @@ public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie, this.longPollThreadPool = createExecutor( this.serverCfg.getNumLongPollWorkerThreads(), "BookieLongPollThread-" + serverCfg.getBookiePort(), - OrderedScheduler.NO_TASK_LIMIT, statsLogger); + OrderedExecutor.NO_TASK_LIMIT, statsLogger); this.highPriorityThreadPool = createExecutor( this.serverCfg.getNumHighPriorityWorkerThreads(), "BookieHighPriorityThread-" + serverCfg.getBookiePort(), - OrderedScheduler.NO_TASK_LIMIT, statsLogger); + OrderedExecutor.NO_TASK_LIMIT, statsLogger); this.requestTimer = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("BookieRequestTimer-%d").build(), this.serverCfg.getRequestTimerTickDurationMs(), @@ -208,7 +207,7 @@ public void close() { shutdownExecutor(highPriorityThreadPool); } - private OrderedSafeExecutor createExecutor( + private OrderedExecutor createExecutor( int numThreads, String nameFormat, int maxTasksInQueue, @@ -216,7 +215,7 @@ private OrderedSafeExecutor createExecutor( if (numThreads <= 0) { return null; } else { - return OrderedSafeExecutor.newBuilder() + return OrderedExecutor.newBuilder() .numThreads(numThreads) .name(nameFormat) .traceTaskExecution(serverCfg.getEnableTaskExecutionStats()) @@ -226,7 +225,7 @@ private OrderedSafeExecutor createExecutor( } } - private void shutdownExecutor(OrderedSafeExecutor service) { + private void shutdownExecutor(OrderedExecutor service) { if (null != service) { service.shutdown(); } @@ -310,7 +309,7 @@ private void processWriteLacRequestV3(final BookkeeperProtocol.Request r, final if (null == writeThreadPool) { writeLac.run(); } else { - writeThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), writeLac); + writeThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), writeLac); } } @@ -319,14 +318,14 @@ private void processReadLacRequestV3(final BookkeeperProtocol.Request r, final C if (null == readThreadPool) { readLac.run(); } else { - readThreadPool.submitOrdered(r.getAddRequest().getLedgerId(), readLac); + readThreadPool.executeOrdered(r.getAddRequest().getLedgerId(), readLac); } } private void processAddRequestV3(final BookkeeperProtocol.Request r, final Channel c) { WriteEntryProcessorV3 write = new WriteEntryProcessorV3(r, c, this); - final OrderedSafeExecutor threadPool; + final OrderedExecutor threadPool; if (RequestUtils.isHighPriority(r)) { threadPool = highPriorityThreadPool; } else { @@ -337,7 +336,7 @@ private void processAddRequestV3(final BookkeeperProtocol.Request r, final Chann write.run(); } else { try { - threadPool.submitOrdered(r.getAddRequest().getLedgerId(), write); + threadPool.executeOrdered(r.getAddRequest().getLedgerId(), write); } catch (RejectedExecutionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", @@ -361,7 +360,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan ExecutorService fenceThread = null == highPriorityThreadPool ? null : highPriorityThreadPool.chooseThread(c); final ReadEntryProcessorV3 read; - final OrderedSafeExecutor threadPool; + final OrderedExecutor threadPool; if (RequestUtils.isLongPollReadRequest(r.getReadRequest())) { ExecutorService lpThread = null == longPollThreadPool ? null : longPollThreadPool.chooseThread(c); @@ -387,7 +386,7 @@ private void processReadRequestV3(final BookkeeperProtocol.Request r, final Chan read.run(); } else { try { - threadPool.submitOrdered(r.getReadRequest().getLedgerId(), read); + threadPool.executeOrdered(r.getReadRequest().getLedgerId(), read); } catch (RejectedExecutionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", @@ -464,7 +463,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch // If it's a high priority add (usually as part of recovery process), we want to make sure it gets // executed as fast as possible, so bypass the normal writeThreadPool and execute in highPriorityThreadPool - final OrderedSafeExecutor threadPool; + final OrderedExecutor threadPool; if (r.isHighPriority()) { threadPool = highPriorityThreadPool; } else { @@ -475,7 +474,7 @@ private void processAddRequest(final BookieProtocol.ParsedAddRequest r, final Ch write.run(); } else { try { - threadPool.submitOrdered(r.getLedgerId(), write); + threadPool.executeOrdered(r.getLedgerId(), write); } catch (RejectedExecutionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to process request to add entry at {}:{}. Too many pending requests", r.ledgerId, @@ -494,7 +493,7 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe // If it's a high priority read (fencing or as part of recovery process), we want to make sure it // gets executed as fast as possible, so bypass the normal readThreadPool // and execute in highPriorityThreadPool - final OrderedSafeExecutor threadPool; + final OrderedExecutor threadPool; if (r.isHighPriority() || r.isFencing()) { threadPool = highPriorityThreadPool; } else { @@ -505,7 +504,7 @@ private void processReadRequest(final BookieProtocol.ReadRequest r, final Channe read.run(); } else { try { - threadPool.submitOrdered(r.getLedgerId(), read); + threadPool.executeOrdered(r.getLedgerId(), read); } catch (RejectedExecutionException e) { if (LOG.isDebugEnabled()) { LOG.debug("Failed to process request to read entry at {}:{}. Too many pending requests", r.ledgerId, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 439320b381e..20fcdda0956 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -85,6 +85,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeperClientStats; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; @@ -120,7 +121,6 @@ import org.apache.bookkeeper.tls.SecurityHandlerFactory.NodeType; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.MathUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap; import org.slf4j.Logger; @@ -149,7 +149,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { final BookieSocketAddress addr; final EventLoopGroup eventLoopGroup; - final OrderedSafeExecutor executor; + final OrderedExecutor executor; final long addEntryTimeoutNanos; final long readEntryTimeoutNanos; final int maxFrameSize; @@ -213,13 +213,13 @@ enum ConnectionState { private final ExtensionRegistry extRegistry; private final SecurityHandlerFactory shFactory; - public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, + public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr) throws SecurityException { this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null, null); } - public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup eventLoopGroup, + public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry) throws SecurityException { @@ -227,7 +227,7 @@ public PerChannelBookieClient(OrderedSafeExecutor executor, EventLoopGroup event authProviderFactory, extRegistry, null); } - public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, + public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, @@ -236,7 +236,7 @@ public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor exec authProviderFactory, extRegistry, pcbcPool, null); } - public PerChannelBookieClient(ClientConfiguration conf, OrderedSafeExecutor executor, + public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, @@ -1111,7 +1111,7 @@ private void readV2Response(final BookieProtocol.Response response) { response.release(); } else { long orderingKey = completionValue.ledgerId; - executor.submitOrdered(orderingKey, + executor.executeOrdered(orderingKey, ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId, status, response)); } @@ -1226,7 +1226,7 @@ private void readV3Response(final Response response) { } } else { long orderingKey = completionValue.ledgerId; - executor.submitOrdered(orderingKey, new SafeRunnable() { + executor.executeOrdered(orderingKey, new SafeRunnable() { @Override public void safeRun() { completionValue.handleV3Response(response); @@ -1400,7 +1400,7 @@ public void setOutstanding() { } protected void errorOutAndRunCallback(final Runnable callback) { - executor.submitOrdered(ledgerId, + executor.executeOrdered(ledgerId, new SafeRunnable() { @Override public void safeRun() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java new file mode 100644 index 00000000000..73150ad0112 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedGenericCallback.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.util; + +import java.util.concurrent.RejectedExecutionException; + +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.SafeRunnable; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generic callback implementation which will run the + * callback in the thread which matches the ordering key. + */ +public abstract class OrderedGenericCallback implements GenericCallback { + private static final Logger LOG = LoggerFactory.getLogger(OrderedGenericCallback.class); + + private final OrderedExecutor executor; + private final long orderingKey; + + /** + * @param executor The executor on which to run the callback + * @param orderingKey Key used to decide which thread the callback + * should run on. + */ + public OrderedGenericCallback(OrderedExecutor executor, long orderingKey) { + this.executor = executor; + this.orderingKey = orderingKey; + } + + @Override + public final void operationComplete(final int rc, final T result) { + // during closing, callbacks that are error out might try to submit to + // the scheduler again. if the submission will go to same thread, we + // don't need to submit to executor again. this is also an optimization for + // callback submission + if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) { + safeOperationComplete(rc, result); + } else { + try { + executor.executeOrdered(orderingKey, new SafeRunnable() { + @Override + public void safeRun() { + safeOperationComplete(rc, result); + } + @Override + public String toString() { + return String.format("Callback(key=%s, name=%s)", + orderingKey, + OrderedGenericCallback.this); + } + }); + } catch (RejectedExecutionException re) { + LOG.warn("Failed to submit callback for {} : ", orderingKey, re); + } + } + } + + public abstract void safeOperationComplete(int rc, T result); +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java deleted file mode 100644 index f3af7df7605..00000000000 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/OrderedSafeExecutor.java +++ /dev/null @@ -1,280 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.bookkeeper.util; - -import com.google.common.util.concurrent.ListenableFuture; - -import io.netty.util.concurrent.DefaultThreadFactory; - -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; - -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This class provides 2 things over the java {@link ScheduledExecutorService}. - * - *

1. It takes {@link SafeRunnable objects} instead of plain Runnable objects. - * This means that exceptions in scheduled tasks wont go unnoticed and will be - * logged. - * - *

2. It supports submitting tasks with an ordering key, so that tasks submitted - * with the same key will always be executed in order, but tasks across - * different keys can be unordered. This retains parallelism while retaining the - * basic amount of ordering we want (e.g. , per ledger handle). Ordering is - * achieved by hashing the key objects to threads by their {@link #hashCode()} - * method. - * - *

Note: deprecated since 4.6.0, in favor of using {@link org.apache.bookkeeper.common.util.OrderedScheduler}. - */ -public class OrderedSafeExecutor extends org.apache.bookkeeper.common.util.OrderedScheduler { - - public static Builder newBuilder() { - return new Builder(); - } - - /** - * A builder class for an OrderedSafeExecutor. - */ - public static class Builder extends AbstractBuilder { - - public OrderedSafeExecutor build() { - if (null == threadFactory) { - threadFactory = new DefaultThreadFactory("bookkeeper-ordered-safe-executor"); - } - return new OrderedSafeExecutor(name, numThreads, threadFactory, statsLogger, - traceTaskExecution, warnTimeMicroSec, maxTasksInQueue); - } - } - - /** - * Constructs Safe executor. - * - * @param numThreads - * - number of threads - * @param baseName - * - base name of executor threads - * @param threadFactory - * - for constructing threads - * @param statsLogger - * - for reporting executor stats - * @param traceTaskExecution - * - should we stat task execution - * @param warnTimeMicroSec - * - log long task exec warning after this interval - * @param maxTasksInQueue - * - maximum items allowed in a thread queue. -1 for no limit - */ - private OrderedSafeExecutor(String baseName, int numThreads, ThreadFactory threadFactory, - StatsLogger statsLogger, boolean traceTaskExecution, - long warnTimeMicroSec, int maxTasksInQueue) { - super(baseName, numThreads, threadFactory, statsLogger, traceTaskExecution, warnTimeMicroSec, maxTasksInQueue); - } - - /** - * Schedules a one time action to execute. - */ - public void submit(SafeRunnable r) { - super.submit(r); - } - - /** - * Schedules a one time action to execute with an ordering guarantee on the key. - * @param orderingKey - * @param r - */ - public ListenableFuture submitOrdered(Object orderingKey, SafeRunnable r) { - return super.submitOrdered(orderingKey, r); - } - - /** - * Schedules a one time action to execute with an ordering guarantee on the key. - * @param orderingKey - * @param r - */ - public void submitOrdered(long orderingKey, SafeRunnable r) { - super.submitOrdered(orderingKey, r); - } - - /** - * Schedules a one time action to execute with an ordering guarantee on the key. - * @param orderingKey - * @param r - */ - public void submitOrdered(int orderingKey, SafeRunnable r) { - super.submitOrdered(orderingKey, r); - } - - /** - * Creates and executes a one-shot action that becomes enabled after the given delay. - * - * @param command - the SafeRunnable to execute - * @param delay - the time from now to delay execution - * @param unit - the time unit of the delay parameter - * @return a ScheduledFuture representing pending completion of the task and whose get() method - * will return null upon completion - */ - public ScheduledFuture schedule(SafeRunnable command, long delay, TimeUnit unit) { - return super.schedule(command, delay, unit); - } - - /** - * Creates and executes a one-shot action that becomes enabled after the given delay. - * - * @param orderingKey - the key used for ordering - * @param command - the SafeRunnable to execute - * @param delay - the time from now to delay execution - * @param unit - the time unit of the delay parameter - * @return a ScheduledFuture representing pending completion of the task and whose get() method - * will return null upon completion - */ - public ScheduledFuture scheduleOrdered(Object orderingKey, SafeRunnable command, long delay, TimeUnit unit) { - return super.scheduleOrdered(orderingKey, command, delay, unit); - } - - /** - * Creates and executes a periodic action that becomes enabled first after - * the given initial delay, and subsequently with the given period. - * - *

For more details check scheduleAtFixedRate in interface ScheduledExecutorService - * - * @param command - the SafeRunnable to execute - * @param initialDelay - the time to delay first execution - * @param period - the period between successive executions - * @param unit - the time unit of the initialDelay and period parameters - * @return a ScheduledFuture representing pending completion of the task, and whose get() - * method will throw an exception upon cancellation - */ - public ScheduledFuture scheduleAtFixedRate(SafeRunnable command, long initialDelay, long period, TimeUnit unit) { - return super.scheduleAtFixedRate(command, initialDelay, period, unit); - } - - /** - * Creates and executes a periodic action that becomes enabled first after - * the given initial delay, and subsequently with the given period. - * - *

For more details check scheduleAtFixedRate in interface ScheduledExecutorService - * - * @param orderingKey - the key used for ordering - * @param command - the SafeRunnable to execute - * @param initialDelay - the time to delay first execution - * @param period - the period between successive executions - * @param unit - the time unit of the initialDelay and period parameters - * @return a ScheduledFuture representing pending completion of the task, and whose get() method - * will throw an exception upon cancellation - */ - public ScheduledFuture scheduleAtFixedRateOrdered(Object orderingKey, SafeRunnable command, long initialDelay, - long period, TimeUnit unit) { - return super.scheduleAtFixedRateOrdered(orderingKey, command, initialDelay, period, unit); - } - - /** - * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently - * with the given delay between the termination of one execution and the commencement of the next. - * - *

For more details check scheduleWithFixedDelay in interface ScheduledExecutorService - * - * @param command - the SafeRunnable to execute - * @param initialDelay - the time to delay first execution - * @param delay - the delay between the termination of one execution and the commencement of the next - * @param unit - the time unit of the initialDelay and delay parameters - * @return a ScheduledFuture representing pending completion of the task, and whose get() method - * will throw an exception upon cancellation - */ - public ScheduledFuture scheduleWithFixedDelay(SafeRunnable command, long initialDelay, long delay, - TimeUnit unit) { - return super.scheduleWithFixedDelay(command, initialDelay, delay, unit); - } - - /** - * Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently - * with the given delay between the termination of one execution and the commencement of the next. - * - *

For more details check scheduleWithFixedDelay in interface ScheduledExecutorService - * - * @param orderingKey - the key used for ordering - * @param command - the SafeRunnable to execute - * @param initialDelay - the time to delay first execution - * @param delay - the delay between the termination of one execution and the commencement of the next - * @param unit - the time unit of the initialDelay and delay parameters - * @return a ScheduledFuture representing pending completion of the task, and whose get() method - * will throw an exception upon cancellation - */ - public ScheduledFuture scheduleWithFixedDelayOrdered(Object orderingKey, SafeRunnable command, long initialDelay, - long delay, TimeUnit unit) { - return super.scheduleWithFixedDelayOrdered(orderingKey, command, initialDelay, delay, unit); - } - - /** - * Generic callback implementation which will run the - * callback in the thread which matches the ordering key. - */ - public abstract static class OrderedSafeGenericCallback - implements GenericCallback { - private static final Logger LOG = LoggerFactory.getLogger(OrderedSafeGenericCallback.class); - - private final OrderedSafeExecutor executor; - private final long orderingKey; - - /** - * @param executor The executor on which to run the callback - * @param orderingKey Key used to decide which thread the callback - * should run on. - */ - public OrderedSafeGenericCallback(OrderedSafeExecutor executor, long orderingKey) { - this.executor = executor; - this.orderingKey = orderingKey; - } - - @Override - public final void operationComplete(final int rc, final T result) { - // during closing, callbacks that are error out might try to submit to - // the scheduler again. if the submission will go to same thread, we - // don't need to submit to executor again. this is also an optimization for - // callback submission - if (Thread.currentThread().getId() == executor.getThreadID(orderingKey)) { - safeOperationComplete(rc, result); - } else { - try { - executor.submitOrdered(orderingKey, new SafeRunnable() { - @Override - public void safeRun() { - safeOperationComplete(rc, result); - } - @Override - public String toString() { - return String.format("Callback(key=%s, name=%s)", - orderingKey, - OrderedSafeGenericCallback.this); - } - }); - } catch (RejectedExecutionException re) { - LOG.warn("Failed to submit callback for {} : ", orderingKey, re); - } - } - } - - public abstract void safeOperationComplete(int rc, T result); - } -} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java index e686742c504..5efd7bde7b2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java @@ -29,8 +29,10 @@ import static org.mockito.Mockito.when; import com.google.common.base.Optional; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + import java.security.GeneralSecurityException; import java.util.ArrayList; import java.util.HashSet; @@ -41,15 +43,16 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.api.CreateBuilder; import org.apache.bookkeeper.client.api.DeleteBuilder; import org.apache.bookkeeper.client.api.OpenBuilder; +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.meta.LedgerIdGenerator; @@ -61,7 +64,6 @@ import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ByteBufList; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.After; import org.junit.Before; import org.mockito.invocation.InvocationOnMock; @@ -76,8 +78,8 @@ public abstract class MockBookKeeperTestCase { private static final Logger LOG = LoggerFactory.getLogger(MockBookKeeperTestCase.class); - protected ScheduledExecutorService scheduler; - protected OrderedSafeExecutor executor; + protected OrderedScheduler scheduler; + protected OrderedExecutor executor; protected BookKeeper bk; protected BookieClient bookieClient; protected LedgerManager ledgerManager; @@ -128,8 +130,8 @@ public void setup() throws Exception { mockLedgerData = new ConcurrentHashMap<>(); mockNextLedgerId = new AtomicLong(1); fencedLedgers = new ConcurrentSkipListSet<>(); - scheduler = new ScheduledThreadPoolExecutor(4); - executor = OrderedSafeExecutor.newBuilder().build(); + scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(4).name("bk-test").build(); + executor = OrderedExecutor.newBuilder().build(); bookieWatcher = mock(BookieWatcher.class); bookieClient = mock(BookieClient.class); @@ -312,7 +314,7 @@ private void setupReadLedgerMetadata() { doAnswer(invocation -> { Object[] args = invocation.getArguments(); Long ledgerId = (Long) args[0]; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[1]; LedgerMetadata ledgerMetadata = mockLedgerMetadataRegistry.get(ledgerId); if (ledgerMetadata == null) { @@ -330,7 +332,7 @@ private void setupRemoveLedgerMetadata() { doAnswer(invocation -> { Object[] args = invocation.getArguments(); Long ledgerId = (Long) args[0]; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2]; if (mockLedgerMetadataRegistry.remove(ledgerId) != null) { cb.operationComplete(BKException.Code.OK, null); @@ -368,7 +370,7 @@ private void setupCreateLedgerMetadata() { Object[] args = invocation.getArguments(); BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2]; Long ledgerId = (Long) args[0]; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { LedgerMetadata ledgerMetadata = (LedgerMetadata) args[1]; mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(ledgerMetadata)); cb.operationComplete(BKException.Code.OK, null); @@ -384,7 +386,7 @@ private void setupWriteLedgerMetadata() { Long ledgerId = (Long) args[0]; LedgerMetadata metadata = (LedgerMetadata) args[1]; BookkeeperInternalCallbacks.GenericCallback cb = (BookkeeperInternalCallbacks.GenericCallback) args[2]; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { mockLedgerMetadataRegistry.put(ledgerId, new LedgerMetadata(metadata)); cb.operationComplete(BKException.Code.OK, null); }); @@ -403,7 +405,7 @@ protected void setupBookieClientReadEntry() { (BookkeeperInternalCallbacks.ReadEntryCallback) args[3]; boolean fenced = (((Integer) args[5]) & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { DigestManager macManager = null; try { macManager = getDigestType(ledgerId); @@ -478,7 +480,7 @@ protected void setupBookieClientAddEntry() { boolean isRecoveryAdd = ((short) options & BookieProtocol.FLAG_RECOVERY_ADD) == BookieProtocol.FLAG_RECOVERY_ADD; - executor.submitOrdered(ledgerId, () -> { + executor.executeOrdered(ledgerId, () -> { byte[] entry; try { entry = extractEntryPayload(ledgerId, entryId, toSend); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java index bf6c2305ff8..a91dffae3eb 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestGetBookieInfoTimeout.java @@ -34,6 +34,7 @@ import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieClient; @@ -41,7 +42,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,7 +56,7 @@ public class TestGetBookieInfoTimeout extends BookKeeperClusterTestCase { private static final Logger LOG = LoggerFactory.getLogger(TestGetBookieInfoTimeout.class); DigestType digestType; public EventLoopGroup eventLoopGroup; - public OrderedSafeExecutor executor; + public OrderedExecutor executor; private ScheduledExecutorService scheduler; public TestGetBookieInfoTimeout() { @@ -69,7 +69,7 @@ public void setUp() throws Exception { super.setUp(); eventLoopGroup = new NioEventLoopGroup(); - executor = OrderedSafeExecutor.newBuilder() + executor = OrderedExecutor.newBuilder() .name("BKClientOrderedSafeExecutor") .numThreads(2) .build(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java index 59811d119dc..a07acef811a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestBackwardCompatCMS42.java @@ -38,6 +38,7 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory; import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.auth.TestAuth; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -49,7 +50,6 @@ import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.Test; /** @@ -64,7 +64,7 @@ public class TestBackwardCompatCMS42 extends BookKeeperClusterTestCase { ExtensionRegistry extRegistry = ExtensionRegistry.newInstance(); ClientAuthProvider.Factory authProvider; EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = OrderedSafeExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient") + OrderedExecutor executor = OrderedExecutor.newBuilder().numThreads(1).name("TestBackwardCompatClient") .build(); public TestBackwardCompatCMS42() throws Exception { @@ -191,7 +191,7 @@ class CompatClient42 extends PerChannelBookieClient { final CountDownLatch connected = new CountDownLatch(1); CompatClient42(ClientConfiguration conf, - OrderedSafeExecutor executor, + OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieSocketAddress addr, ClientAuthProvider.Factory authProviderFactory, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java index 90788b1e766..d96dce1044d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/TestPerChannelBookieClient.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.auth.ClientAuthProvider; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieSocketAddress; @@ -46,7 +47,6 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; import org.apache.bookkeeper.proto.PerChannelBookieClient.ConnectionState; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.apache.bookkeeper.util.SafeRunnable; import org.junit.Test; import org.slf4j.Logger; @@ -79,7 +79,7 @@ public TestPerChannelBookieClient() throws Exception { @Test public void testConnectCloseRace() throws Exception { EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = getOrderedSafeExecutor(); + OrderedExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 1000; i++) { @@ -98,8 +98,8 @@ public void operationComplete(int rc, PerChannelBookieClient client) { executor.shutdown(); } - public OrderedSafeExecutor getOrderedSafeExecutor() { - return OrderedSafeExecutor.newBuilder() + public OrderedExecutor getOrderedSafeExecutor() { + return OrderedExecutor.newBuilder() .name("PCBC") .numThreads(1) .traceTaskExecution(true) @@ -122,7 +122,7 @@ public void operationComplete(int rc, PerChannelBookieClient pcbc) { } }; EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = getOrderedSafeExecutor(); + OrderedExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); for (int i = 0; i < 100; i++) { @@ -154,7 +154,7 @@ public void operationComplete(int rc, PerChannelBookieClient client) { }; final int iterations = 100000; EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - OrderedSafeExecutor executor = getOrderedSafeExecutor(); + OrderedExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, @@ -250,7 +250,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) bs.add(startBookie(conf, delayBookie)); EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); - final OrderedSafeExecutor executor = getOrderedSafeExecutor(); + final OrderedExecutor executor = getOrderedSafeExecutor(); BookieSocketAddress addr = getBookie(0); final PerChannelBookieClient client = new PerChannelBookieClient(executor, eventLoopGroup, @@ -268,7 +268,7 @@ public void readEntryComplete(int rc, long ledgerId, long entryId, @Override public void operationComplete(final int rc, PerChannelBookieClient pcbc) { if (rc != BKException.Code.OK) { - executor.submitOrdered(1, new SafeRunnable() { + executor.executeOrdered(1, new SafeRunnable() { @Override public void safeRun() { cb.readEntryComplete(rc, 1, 1, null, null); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 7ecce0ba90e..29c2e3ce063 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo; +import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.conf.TestBKConfiguration; @@ -54,7 +55,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.util.ByteBufList; import org.apache.bookkeeper.util.IOUtils; -import org.apache.bookkeeper.util.OrderedSafeExecutor; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,7 +68,7 @@ public class BookieClientTest { public int port = 13645; public EventLoopGroup eventLoopGroup; - public OrderedSafeExecutor executor; + public OrderedExecutor executor; private ScheduledExecutorService scheduler; @Before @@ -85,7 +85,7 @@ public void setUp() throws Exception { bs = new BookieServer(conf); bs.start(); eventLoopGroup = new NioEventLoopGroup(); - executor = OrderedSafeExecutor.newBuilder() + executor = OrderedExecutor.newBuilder() .name("BKClientOrderedSafeExecutor") .numThreads(2) .build(); diff --git a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml index 1886c2aeb2d..c5f7f070da8 100644 --- a/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml +++ b/buildtools/src/main/resources/bookkeeper/findbugsExclude.xml @@ -123,6 +123,9 @@ + + + diff --git a/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java new file mode 100644 index 00000000000..66ce59bc39f --- /dev/null +++ b/microbenchmarks/src/main/java/org/apache/bookkeeper/common/OrderedExecutorBenchmark.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.bookkeeper.common; + +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +/** + * Microbenchmarks for different executors providers. + */ +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Threads(16) +@Fork(1) +@Warmup(iterations = 1, time = 10, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) +public class OrderedExecutorBenchmark { + + private static Map> providers = ImmutableMap.of( // + "JDK-ThreadPool", () -> Executors.newFixedThreadPool(1), + "OrderedExecutor", () -> OrderedExecutor.newBuilder().numThreads(1).build(), // + "OrderedScheduler", () -> OrderedScheduler.newSchedulerBuilder().numThreads(1).build()); + + @State(Scope.Benchmark) + public static class TestState { + @Param({ "JDK-ThreadPool", "OrderedExecutor", "OrderedScheduler" }) + private String executorName; + + private ExecutorService executor; + + @Setup(Level.Trial) + public void setup() { + executor = providers.get(executorName).get(); + } + + @TearDown(Level.Trial) + public void teardown() { + executor.shutdown(); + } + } + + @Benchmark + public void submitAndWait(TestState s) throws Exception { + s.executor.submit(() -> { + }).get(); + } +} diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java index 6adcbc420f2..3c8dfb7aa1e 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java @@ -391,6 +391,7 @@ public synchronized CompletableFuture readNext() { return readInternal(1, 0, TimeUnit.MILLISECONDS).thenApply(READ_NEXT_MAP_FUNCTION); } + @Override public synchronized CompletableFuture> readBulk(int numEntries) { return readInternal(numEntries, 0, TimeUnit.MILLISECONDS); } @@ -477,7 +478,7 @@ public synchronized void scheduleBackgroundRead() { long prevCount = scheduleCountUpdater.getAndIncrement(this); if (0 == prevCount) { scheduleDelayStopwatch.reset().start(); - scheduler.submitOrdered(streamName, this); + scheduler.executeOrdered(streamName, this); } } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java index a0dd6ad124c..c9bca44400d 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java @@ -394,7 +394,7 @@ private void orderedSubmit(SafeRunnable runnable) { } } try { - scheduler.submitOrdered(streamName, runnable); + scheduler.executeOrdered(streamName, runnable); } catch (RejectedExecutionException ree) { logger.debug("Failed to submit and execute an operation for readhead entry reader of {}", streamName, ree); @@ -449,7 +449,7 @@ public CompletableFuture asyncClose() { // use runnable here instead of CloseableRunnable, // because we need this to be executed try { - scheduler.submitOrdered(streamName, () -> unsafeAsyncClose(closeFuture)); + scheduler.executeOrdered(streamName, () -> unsafeAsyncClose(closeFuture)); } catch (RejectedExecutionException ree) { logger.warn("Scheduler has been shutdown before closing the readahead entry reader for stream {}", streamName, ree); diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java index 5a9ee1a6c4b..d78b4550f72 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java @@ -212,7 +212,7 @@ protected void submitTask(Object key, SafeRunnable r) { if (closed) { return; } - scheduler.submitOrdered(key, r); + scheduler.executeOrdered(key, r); } finally { closeLock.readLock().unlock(); } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java index 898c1138248..813e9742a30 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java @@ -691,7 +691,7 @@ private void processReadRequests() { long prevCount = scheduleCountUpdater.getAndIncrement(this); if (0 == prevCount) { - scheduler.submitOrdered(getSegment().getLogSegmentId(), this); + scheduler.executeOrdered(getSegment().getLogSegmentId(), this); } } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java index 7948084fa67..107c5e3174e 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java @@ -135,6 +135,7 @@ private synchronized void checkLockState() throws LockingException { * Asynchronously acquire the lock. Technically the try phase of this operation--which adds us to the waiter * list--is executed synchronously, but the lock wait itself doesn't block. */ + @Override public synchronized CompletableFuture asyncAcquire() { if (null != lockAcquireFuture) { return FutureUtils.exception( @@ -145,7 +146,7 @@ public synchronized CompletableFuture asyncAcquire() { if (null == throwable || !(throwable instanceof CancellationException)) { return; } - lockStateExecutor.submitOrdered(lockPath, () -> asyncClose()); + lockStateExecutor.executeOrdered(lockPath, () -> asyncClose()); }); final Stopwatch stopwatch = Stopwatch.createStarted(); promise.whenComplete(new FutureEventListener() { @@ -165,7 +166,7 @@ public void onFailure(Throwable cause) { } }); this.lockAcquireFuture = promise; - lockStateExecutor.submitOrdered( + lockStateExecutor.executeOrdered( lockPath, () -> doAsyncAcquireWithSemaphore(promise, lockTimeout)); return promise; } @@ -293,6 +294,7 @@ public void onExpired() { * * @throws LockingException if the lock attempt fails */ + @Override public synchronized void checkOwnershipAndReacquire() throws LockingException { if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) { throw new LockingException(lockPath, "check ownership before acquiring"); @@ -315,6 +317,7 @@ public synchronized void checkOwnershipAndReacquire() throws LockingException { * * @throws LockingException if the lock attempt fails */ + @Override public synchronized void checkOwnership() throws LockingException { if (null == lockAcquireFuture || !lockAcquireFuture.isDone()) { throw new LockingException(lockPath, "check ownership before acquiring"); @@ -426,7 +429,7 @@ private void complete() { FutureUtils.complete(closePromise, null); } }, lockStateExecutor.chooseThread(lockPath)); - lockStateExecutor.submitOrdered( + lockStateExecutor.executeOrdered( lockPath, () -> closeWaiter(lockWaiter, closeWaiterFuture)); return closePromise; } @@ -434,7 +437,7 @@ private void complete() { void internalReacquireLock(final AtomicInteger numRetries, final long lockTimeout, final CompletableFuture reacquirePromise) { - lockStateExecutor.submitOrdered( + lockStateExecutor.executeOrdered( lockPath, () -> doInternalReacquireLock(numRetries, lockTimeout, reacquirePromise)); } diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java index 3d7933631da..84c516c344a 100644 --- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java +++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java @@ -168,6 +168,7 @@ public static String getLockIdFromPath(String path) { } static final Comparator MEMBER_COMPARATOR = new Comparator() { + @Override public int compare(String o1, String o2) { int l1 = parseMemberID(o1); int l2 = parseMemberID(o2); @@ -374,6 +375,7 @@ Pair getLockId() { return lockId; } + @Override public boolean isLockExpired() { return lockState.isExpiredOrClosing(); } @@ -392,7 +394,7 @@ public boolean isLockHeld() { * function to execute a lock action */ protected void executeLockAction(final int lockEpoch, final LockAction func) { - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { if (getEpoch() == lockEpoch) { @@ -431,7 +433,7 @@ public void safeRun() { */ protected void executeLockAction(final int lockEpoch, final LockAction func, final CompletableFuture promise) { - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { int currentEpoch = getEpoch(); @@ -554,7 +556,7 @@ public CompletableFuture asyncTryLock(final long timeout, final Time @Override public void processResult(final int rc, String path, Object ctx, final List children, Stat stat) { - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { if (!lockState.inState(State.INIT)) { @@ -648,7 +650,7 @@ public void onFailure(Throwable cause) { private boolean checkOrClaimLockOwner(final Pair currentOwner, final CompletableFuture result) { if (lockId.compareTo(currentOwner) != 0 && !lockContext.hasLockId(currentOwner)) { - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { result.complete(currentOwner.getLeft()); @@ -878,7 +880,7 @@ CompletableFuture asyncUnlock(final Throwable cause) { // Use lock executor here rather than lock action, because we want this opertaion to be applied // whether the epoch has changed or not. The member node is EPHEMERAL_SEQUENTIAL so there's no // risk of an ABA problem where we delete and recreate a node and then delete it again here. - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { acquireFuture.completeExceptionally(cause); @@ -980,7 +982,7 @@ private void deleteLockNode(final CompletableFuture promise) { zk.delete(currentNode, -1, new AsyncCallback.VoidCallback() { @Override public void processResult(final int rc, final String path, Object ctx) { - lockStateExecutor.submitOrdered(lockPath, new SafeRunnable() { + lockStateExecutor.executeOrdered(lockPath, new SafeRunnable() { @Override public void safeRun() { if (KeeperException.Code.OK.intValue() == rc) { diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java index 517b0dfcd4b..437bb6fab50 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java @@ -65,6 +65,7 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase { private BookKeeperClient bkc; private ZooKeeperClient zkc; + @Override @Before public void setup() throws Exception { super.setup(); @@ -94,6 +95,7 @@ public void setup() throws Exception { .build(); } + @Override @After public void teardown() throws Exception { if (null != bkc) { @@ -138,15 +140,8 @@ private ReadAheadEntryReader createEntryReader(String streamName, private void ensureOrderSchedulerEmpty(String streamName) throws Exception { final CompletableFuture promise = new CompletableFuture(); - scheduler.submitOrdered(streamName, () -> { + scheduler.executeOrdered(streamName, () -> { FutureUtils.complete(promise, null); - // the following line is needed for oraclejdk9 to avoid following exception - // ``` - // incompatible types: inference variable T has incompatible bounds - // upper bounds: java.lang.Object - // lower bounds: void - // ``` - return (Void) null; }); Utils.ioResult(promise); } diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java index 901d2a04cdc..8a9d9c2f82b 100644 --- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java +++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java @@ -779,7 +779,7 @@ public void onExpired() { // expire session ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs); // submit a runnable to lock state executor to ensure any state changes happened when session expired - lockStateExecutor.submitOrdered(lockPath, () -> expiredLatch.countDown()); + lockStateExecutor.executeOrdered(lockPath, () -> expiredLatch.countDown()); expiredLatch.await(); // no watcher was registered if never acquired lock successfully assertEquals(State.INIT, lock.getLockState()); @@ -1216,7 +1216,7 @@ private void testLockWhenSiblingUseSameLockId(long timeout, final boolean isUnlo } else { ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs); final CountDownLatch latch = new CountDownLatch(1); - lockStateExecutor.submitOrdered(lockPath, () -> latch.countDown()); + lockStateExecutor.executeOrdered(lockPath, () -> latch.countDown()); latch.await(); children = getLockWaiters(zkc, lockPath); assertEquals(0, children.size()); diff --git a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java index 6155cba5ef9..39e00c06c2b 100644 --- a/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java +++ b/stream/storage/impl/src/main/java/org/apache/bookkeeper/stream/storage/impl/sc/FailRequestStorageContainer.java @@ -79,7 +79,7 @@ public void close() { private CompletableFuture failWrongGroupRequest(long scId) { CompletableFuture future = FutureUtils.createFuture(); - scheduler.submitOrdered(scId, () -> { + scheduler.executeOrdered(scId, () -> { future.completeExceptionally(new StatusRuntimeException(Status.NOT_FOUND)); }); return future; diff --git a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java index e462d33d651..eba29f14a47 100644 --- a/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java +++ b/tests/shaded/bookkeeper-server-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/BookKeeperServerShadedJarTest.java @@ -50,7 +50,7 @@ public void testGuavaShadedPath() throws Exception { @Test public void testBookKeeperCommon() throws Exception { - Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor"); + Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor"); assertTrue(true); } diff --git a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java index 45be333af1c..c4519dfdf99 100644 --- a/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java +++ b/tests/shaded/distributedlog-core-shaded-test/src/test/java/org/apache/bookkeeper/tests/shaded/DistributedLogCoreShadedJarTest.java @@ -85,13 +85,13 @@ public void testZooKeeperShadedPath() throws Exception { @Test(expected = ClassNotFoundException.class) public void testBookKeeperCommon() throws Exception { - Class.forName("org.apache.bookkeeper.util.OrderedSafeExecutor"); + Class.forName("org.apache.bookkeeper.common.util.OrderedExecutor"); assertTrue(true); } @Test public void testBookKeeperCommonShade() throws Exception { - Class.forName("dlshade.org.apache.bookkeeper.util.OrderedSafeExecutor"); + Class.forName("dlshade.org.apache.bookkeeper.common.util.OrderedExecutor"); assertTrue(true); }