diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java index 655d49de716..3c7bd5089e2 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedExecutorService.java @@ -59,51 +59,67 @@ private void checkQueue(int numberOfTasks) { @Override public List> invokeAll(Collection> tasks) throws InterruptedException { - checkQueue(tasks.size()); - return super.invokeAll(tasks); + synchronized (this) { + checkQueue(tasks.size()); + return super.invokeAll(tasks); + } } @Override public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - checkQueue(tasks.size()); - return super.invokeAll(tasks, timeout, unit); + synchronized (this) { + checkQueue(tasks.size()); + return super.invokeAll(tasks, timeout, unit); + } } @Override public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - checkQueue(tasks.size()); - return super.invokeAny(tasks); + synchronized (this) { + checkQueue(tasks.size()); + return super.invokeAny(tasks); + } } @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - checkQueue(tasks.size()); - return super.invokeAny(tasks, timeout, unit); + synchronized (this) { + checkQueue(tasks.size()); + return super.invokeAny(tasks, timeout, unit); + } } @Override public void execute(Runnable command) { - checkQueue(1); - super.execute(command); + synchronized (this) { + checkQueue(1); + super.execute(command); + } } @Override public Future submit(Callable task) { - checkQueue(1); - return super.submit(task); + synchronized (this) { + checkQueue(1); + return super.submit(task); + } } @Override public Future submit(Runnable task) { - checkQueue(1); - return super.submit(task); + synchronized (this) { + checkQueue(1); + return super.submit(task); + } } @Override public Future submit(Runnable task, T result) { - checkQueue(1); - return super.submit(task, result); + synchronized (this) { + checkQueue(1); + return super.submit(task, result); + } } } diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java index 44c6f38d279..8cf554178eb 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/BoundedScheduledExecutorService.java @@ -59,78 +59,102 @@ protected ListeningExecutorService delegate() { @Override public ListenableScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { - this.checkQueue(1); - return this.thread.schedule(command, delay, unit); + synchronized (this) { + this.checkQueue(1); + return this.thread.schedule(command, delay, unit); + } } @Override public ListenableScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { - this.checkQueue(1); - return this.thread.schedule(callable, delay, unit); + synchronized (this) { + this.checkQueue(1); + return this.thread.schedule(callable, delay, unit); + } } @Override public ListenableScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - this.checkQueue(1); - return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit); + synchronized (this) { + this.checkQueue(1); + return this.thread.scheduleAtFixedRate(command, initialDelay, period, unit); + } } @Override public ListenableScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - this.checkQueue(1); - return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit); + synchronized (this) { + this.checkQueue(1); + return this.thread.scheduleAtFixedRate(command, initialDelay, delay, unit); + } } @Override public ListenableFuture submit(Callable task) { - this.checkQueue(1); - return super.submit(task); + synchronized (this) { + this.checkQueue(1); + return super.submit(task); + } } @Override public ListenableFuture submit(Runnable task) { - this.checkQueue(1); - return super.submit(task); + synchronized (this) { + this.checkQueue(1); + return super.submit(task); + } } @Override public List> invokeAll(Collection> tasks) throws InterruptedException { - this.checkQueue(tasks.size()); - return super.invokeAll(tasks); + synchronized (this) { + this.checkQueue(tasks.size()); + return super.invokeAll(tasks); + } } @Override public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { - this.checkQueue(tasks.size()); - return super.invokeAll(tasks, timeout, unit); + synchronized (this) { + this.checkQueue(tasks.size()); + return super.invokeAll(tasks, timeout, unit); + } } @Override public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { - this.checkQueue(tasks.size()); - return super.invokeAny(tasks); + synchronized (this) { + this.checkQueue(tasks.size()); + return super.invokeAny(tasks); + } } @Override public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - this.checkQueue(tasks.size()); - return super.invokeAny(tasks, timeout, unit); + synchronized (this) { + this.checkQueue(tasks.size()); + return super.invokeAny(tasks, timeout, unit); + } } @Override public ListenableFuture submit(Runnable task, T result) { - this.checkQueue(1); - return super.submit(task, result); + synchronized (this) { + this.checkQueue(1); + return super.submit(task, result); + } } @Override public void execute(Runnable command) { - this.checkQueue(1); - super.execute(command); + synchronized (this) { + this.checkQueue(1); + super.execute(command); + } } private void checkQueue(int numberOfTasks) {