Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.primitives.Longs;
import org.apache.druid.java.util.common.logger.Logger;

import javax.annotation.Nullable;
import java.util.ArrayList;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[ERROR] /home/travis/build/apache/druid/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java:26: 'javax.annotation.Nullable' should be separated from previous imports. [ImportOrder]

The checkstyle wants an empty line between the Lines 26 and 27.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it's done.

import java.util.List;
import java.util.concurrent.Future;

/**
*/
public class GuavaUtils
{
private static final Logger log = new Logger(GuavaUtils.class);

/**
* To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started
Expand Down Expand Up @@ -77,4 +82,38 @@ public static <T> T firstNonNull(@Nullable T arg1, @Nullable T arg2)
}
return arg1;
}

/**
* Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture}
* automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of
* future.
* @param mayInterruptIfRunning {@code true} if the thread executing this
* task should be interrupted; otherwise, in-progress tasks are allowed
* to complete
* @param combinedFuture The combinedFuture that associated with futures
* @param futures The futures that we want to cancel
*/
public static <F extends Future<?>> void cancelAll(
boolean mayInterruptIfRunning,
@Nullable Future<?> combinedFuture,
List<F> futures
)
{
final List<Future> allFuturesToCancel = new ArrayList<>();
allFuturesToCancel.add(combinedFuture);
allFuturesToCancel.addAll(futures);
if (allFuturesToCancel.isEmpty()) {
return;
}
allFuturesToCancel.forEach(f -> {
try {
if (f != null) {
f.cancel(mayInterruptIfRunning);
}
}
catch (Throwable t) {
log.warn(t, "Error while cancelling future.");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@
package org.apache.druid.common.guava;

import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

public class GuavaUtilsTest
{
enum MyEnum
Expand Down Expand Up @@ -53,4 +66,50 @@ public void testGetEnumIfPresent()
Assert.assertEquals(MyEnum.BUCKLE_MY_SHOE, GuavaUtils.getEnumIfPresent(MyEnum.class, "BUCKLE_MY_SHOE"));
Assert.assertEquals(null, GuavaUtils.getEnumIfPresent(MyEnum.class, "buckle_my_shoe"));
}

@Test
public void testCancelAll()
{
int tasks = 3;
ExecutorService service = Executors.newFixedThreadPool(tasks);
ListeningExecutorService exc = MoreExecutors.listeningDecorator(service);
AtomicInteger index = new AtomicInteger(0);
//a flag what time to throw exception.
AtomicBoolean active = new AtomicBoolean(false);
Function<Integer, List<ListenableFuture<Object>>> function = (taskCount) -> {
List<ListenableFuture<Object>> futures = new ArrayList<>();
for (int i = 0; i < taskCount; i++) {
ListenableFuture<Object> future = exc.submit(new Callable<Object>() {
@Override
public Object call() throws RuntimeException
{
int internalIndex = index.incrementAndGet();
while (true) {
if (internalIndex == taskCount && active.get()) {
//here we simulate occurs exception in some one future.
throw new RuntimeException("A big bug");
}
}
}
});
futures.add(future);
}
return futures;
};

List<ListenableFuture<Object>> futures = function.apply(tasks);
Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count());
//here we make one of task throw exception.
active.set(true);

ListenableFuture<List<Object>> future = Futures.allAsList(futures);
try {
future.get();
}
catch (Exception e) {
Assert.assertEquals("java.lang.RuntimeException: A big bug", e.getMessage());
GuavaUtils.cancelAll(true, future, futures);
Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeIterable;
Expand Down Expand Up @@ -100,7 +101,7 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext respo
public Iterator<T> make()
{
// Make it a List<> to materialize all of the values (so that it will submit everything to the executor)
ListenableFuture<List<Iterable<T>>> futures = Futures.allAsList(
List<ListenableFuture<Iterable<T>>> futures =
Lists.newArrayList(
Iterables.transform(
queryables,
Expand Down Expand Up @@ -141,33 +142,35 @@ public Iterable<T> call()
);
}
)
)
);
);

queryWatcher.registerQueryFuture(query, futures);
ListenableFuture<List<Iterable<T>>> future = Futures.allAsList(futures);
queryWatcher.registerQueryFuture(query, future);

try {
return new MergeIterable<>(
ordering.nullsFirst(),
QueryContexts.hasTimeout(query) ?
futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) :
futures.get()
future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) :
future.get()
).iterator();
}
catch (InterruptedException e) {
log.noStackTrace().warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
//Note: canceling combinedFuture first so that it can complete with INTERRUPTED as its final state. See ChainedExecutionQueryRunnerTest.testQueryTimeout()
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
Throwables.propagateIfPossible(e.getCause());
throw new RuntimeException(e.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -93,7 +95,7 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final ResponseContext respo
final boolean bySegment = QueryContexts.isBySegment(query);
final int priority = QueryContexts.getPriority(query);
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<List<Void>> futures = Futures.allAsList(
final List<ListenableFuture<Void>> futures =
Lists.newArrayList(
Iterables.transform(
queryables,
Expand Down Expand Up @@ -136,15 +138,14 @@ public Void call()
);

if (isSingleThreaded) {
waitForFutureCompletion(query, future, indexAccumulatorPair.lhs);
waitForFutureCompletion(query, ImmutableList.of(future), indexAccumulatorPair.lhs);
}

return future;
}
}
)
)
);
);

if (!isSingleThreaded) {
waitForFutureCompletion(query, futures, indexAccumulatorPair.lhs);
Expand Down Expand Up @@ -173,10 +174,11 @@ public T apply(Row input)

private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<?> future,
List<ListenableFuture<Void>> futures,
IncrementalIndex<?> closeOnFailure
)
{
ListenableFuture<List<Void>> future = Futures.allAsList(futures);
try {
queryWatcher.registerQueryFuture(query, future);
if (QueryContexts.hasTimeout(query)) {
Expand All @@ -187,7 +189,7 @@ private void waitForFutureCompletion(
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
closeOnFailure.close();
throw new QueryInterruptedException(e);
}
Expand All @@ -198,10 +200,11 @@ private void waitForFutureCompletion(
catch (TimeoutException e) {
closeOnFailure.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
closeOnFailure.close();
Throwables.propagateIfPossible(e.getCause());
throw new RuntimeException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
Expand Down Expand Up @@ -339,8 +340,7 @@ private boolean isParallelizable()
private List<CloseableIterator<Entry<KeyType>>> parallelSortAndGetGroupersIterator()
{
// The number of groupers is same with the number of processing threads in the executor
final ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future = Futures.allAsList(
groupers.stream()
final List<ListenableFuture<CloseableIterator<Entry<KeyType>>>> futures = groupers.stream()
.map(grouper ->
executor.submit(
new AbstractPrioritizedCallable<CloseableIterator<Entry<KeyType>>>(priority)
Expand All @@ -353,21 +353,20 @@ public CloseableIterator<Entry<KeyType>> call()
}
)
)
.collect(Collectors.toList())
.collect(Collectors.toList()
);

ListenableFuture<List<CloseableIterator<Entry<KeyType>>>> future = Futures.allAsList(futures);
try {
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
}
catch (InterruptedException | TimeoutException e) {
future.cancel(true);
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
catch (InterruptedException | TimeoutException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new RuntimeException(e.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.collections.Releaser;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -215,8 +216,7 @@ public CloseableGrouperIterator<RowBasedKey, ResultRow> make()
ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.register(grouperHolder);

ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList(
List<ListenableFuture<AggregateResult>> futures = Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<ResultRow>, ListenableFuture<AggregateResult>>()
Expand Down Expand Up @@ -259,7 +259,7 @@ public AggregateResult call()
if (isSingleThreaded) {
waitForFutureCompletion(
query,
Futures.allAsList(ImmutableList.of(future)),
ImmutableList.of(future),
hasTimeout,
timeoutAt - System.currentTimeMillis()
);
Expand All @@ -269,8 +269,7 @@ public AggregateResult call()
}
}
)
)
);
);

if (!isSingleThreaded) {
waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis());
Expand Down Expand Up @@ -339,11 +338,12 @@ private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(

private void waitForFutureCompletion(
GroupByQuery query,
ListenableFuture<List<AggregateResult>> future,
List<ListenableFuture<AggregateResult>> futures,
boolean hasTimeout,
long timeout
)
{
ListenableFuture<List<AggregateResult>> future = Futures.allAsList(futures);
try {
if (queryWatcher != null) {
queryWatcher.registerQueryFuture(query, future);
Expand All @@ -357,25 +357,27 @@ private void waitForFutureCompletion(

for (AggregateResult result : results) {
if (!result.isOk()) {
future.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
throw new ResourceLimitExceededException(result.getReason());
}
}
}
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new RuntimeException(e);
}
}
Expand Down
Loading