diff --git a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java index fa69f8286d1a..b3455393e72e 100644 --- a/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java +++ b/core/src/main/java/org/apache/druid/common/guava/GuavaUtils.java @@ -22,13 +22,18 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.primitives.Longs; +import org.apache.druid.java.util.common.logger.Logger; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Future; /** */ public class GuavaUtils { + private static final Logger log = new Logger(GuavaUtils.class); /** * To fix semantic difference of Longs.tryParse() from Long.parseLong (Longs.tryParse() returns null for '+' started @@ -77,4 +82,38 @@ public static T firstNonNull(@Nullable T arg1, @Nullable T arg2) } return arg1; } + + /** + * Cancel futures manually, because sometime we can't cancel all futures in {@link com.google.common.util.concurrent.Futures.CombinedFuture} + * automatically. Especially when we call {@link com.google.common.util.concurrent.Futures#allAsList(Iterable)} to create a batch of + * future. + * @param mayInterruptIfRunning {@code true} if the thread executing this + * task should be interrupted; otherwise, in-progress tasks are allowed + * to complete + * @param combinedFuture The combinedFuture that associated with futures + * @param futures The futures that we want to cancel + */ + public static > void cancelAll( + boolean mayInterruptIfRunning, + @Nullable Future combinedFuture, + List futures + ) + { + final List allFuturesToCancel = new ArrayList<>(); + allFuturesToCancel.add(combinedFuture); + allFuturesToCancel.addAll(futures); + if (allFuturesToCancel.isEmpty()) { + return; + } + allFuturesToCancel.forEach(f -> { + try { + if (f != null) { + f.cancel(mayInterruptIfRunning); + } + } + catch (Throwable t) { + log.warn(t, "Error while cancelling future."); + } + }); + } } diff --git a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java index 6bd764f0a5de..27bebbfa4e66 100644 --- a/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java +++ b/core/src/test/java/org/apache/druid/common/guava/GuavaUtilsTest.java @@ -20,9 +20,22 @@ package org.apache.druid.common.guava; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.junit.Assert; import org.junit.Test; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; + public class GuavaUtilsTest { enum MyEnum @@ -53,4 +66,50 @@ public void testGetEnumIfPresent() Assert.assertEquals(MyEnum.BUCKLE_MY_SHOE, GuavaUtils.getEnumIfPresent(MyEnum.class, "BUCKLE_MY_SHOE")); Assert.assertEquals(null, GuavaUtils.getEnumIfPresent(MyEnum.class, "buckle_my_shoe")); } + + @Test + public void testCancelAll() + { + int tasks = 3; + ExecutorService service = Executors.newFixedThreadPool(tasks); + ListeningExecutorService exc = MoreExecutors.listeningDecorator(service); + AtomicInteger index = new AtomicInteger(0); + //a flag what time to throw exception. + AtomicBoolean active = new AtomicBoolean(false); + Function>> function = (taskCount) -> { + List> futures = new ArrayList<>(); + for (int i = 0; i < taskCount; i++) { + ListenableFuture future = exc.submit(new Callable() { + @Override + public Object call() throws RuntimeException + { + int internalIndex = index.incrementAndGet(); + while (true) { + if (internalIndex == taskCount && active.get()) { + //here we simulate occurs exception in some one future. + throw new RuntimeException("A big bug"); + } + } + } + }); + futures.add(future); + } + return futures; + }; + + List> futures = function.apply(tasks); + Assert.assertEquals(tasks, futures.stream().filter(f -> !f.isDone()).count()); + //here we make one of task throw exception. + active.set(true); + + ListenableFuture> future = Futures.allAsList(futures); + try { + future.get(); + } + catch (Exception e) { + Assert.assertEquals("java.lang.RuntimeException: A big bug", e.getMessage()); + GuavaUtils.cancelAll(true, future, futures); + Assert.assertEquals(0, futures.stream().filter(f -> !f.isDone()).count()); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java index cf149138ff28..f400bacbdd1e 100644 --- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.MergeIterable; @@ -100,7 +101,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo public Iterator make() { // Make it a List<> to materialize all of the values (so that it will submit everything to the executor) - ListenableFuture>> futures = Futures.allAsList( + List>> futures = Lists.newArrayList( Iterables.transform( queryables, @@ -141,22 +142,23 @@ public Iterable call() ); } ) - ) - ); + ); - queryWatcher.registerQueryFuture(query, futures); + ListenableFuture>> future = Futures.allAsList(futures); + queryWatcher.registerQueryFuture(query, future); try { return new MergeIterable<>( ordering.nullsFirst(), QueryContexts.hasTimeout(query) ? - futures.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : - futures.get() + future.get(QueryContexts.getTimeout(query), TimeUnit.MILLISECONDS) : + future.get() ).iterator(); } catch (InterruptedException e) { log.noStackTrace().warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - futures.cancel(true); + //Note: canceling combinedFuture first so that it can complete with INTERRUPTED as its final state. See ChainedExecutionQueryRunnerTest.testQueryTimeout() + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (CancellationException e) { @@ -164,10 +166,11 @@ public Iterable call() } catch (TimeoutException e) { log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId()); - futures.cancel(true); + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (ExecutionException e) { + GuavaUtils.cancelAll(true, future, futures); Throwables.propagateIfPossible(e.getCause()); throw new RuntimeException(e.getCause()); } diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java index 1653fb170637..8845f2a3a36a 100644 --- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java @@ -23,6 +23,7 @@ import com.google.common.base.Predicates; import com.google.common.base.Supplier; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; @@ -30,6 +31,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.data.input.Row; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -93,7 +95,7 @@ public Sequence run(final QueryPlus queryPlus, final ResponseContext respo final boolean bySegment = QueryContexts.isBySegment(query); final int priority = QueryContexts.getPriority(query); final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); - final ListenableFuture> futures = Futures.allAsList( + final List> futures = Lists.newArrayList( Iterables.transform( queryables, @@ -136,15 +138,14 @@ public Void call() ); if (isSingleThreaded) { - waitForFutureCompletion(query, future, indexAccumulatorPair.lhs); + waitForFutureCompletion(query, ImmutableList.of(future), indexAccumulatorPair.lhs); } return future; } } ) - ) - ); + ); if (!isSingleThreaded) { waitForFutureCompletion(query, futures, indexAccumulatorPair.lhs); @@ -173,10 +174,11 @@ public T apply(Row input) private void waitForFutureCompletion( GroupByQuery query, - ListenableFuture future, + List> futures, IncrementalIndex closeOnFailure ) { + ListenableFuture> future = Futures.allAsList(futures); try { queryWatcher.registerQueryFuture(query, future); if (QueryContexts.hasTimeout(query)) { @@ -187,7 +189,7 @@ private void waitForFutureCompletion( } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - future.cancel(true); + GuavaUtils.cancelAll(true, future, futures); closeOnFailure.close(); throw new QueryInterruptedException(e); } @@ -198,10 +200,11 @@ private void waitForFutureCompletion( catch (TimeoutException e) { closeOnFailure.close(); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); - future.cancel(true); + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (ExecutionException e) { + GuavaUtils.cancelAll(true, future, futures); closeOnFailure.close(); Throwables.propagateIfPossible(e.getCause()); throw new RuntimeException(e.getCause()); diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java index f5ff7ba627cb..73c1b64397e4 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java @@ -28,6 +28,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import org.apache.druid.collections.ReferenceCountingResourceHolder; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -339,8 +340,7 @@ private boolean isParallelizable() private List>> parallelSortAndGetGroupersIterator() { // The number of groupers is same with the number of processing threads in the executor - final ListenableFuture>>> future = Futures.allAsList( - groupers.stream() + final List>>> futures = groupers.stream() .map(grouper -> executor.submit( new AbstractPrioritizedCallable>>(priority) @@ -353,21 +353,20 @@ public CloseableIterator> call() } ) ) - .collect(Collectors.toList()) + .collect(Collectors.toList() ); + ListenableFuture>>> future = Futures.allAsList(futures); try { final long timeout = queryTimeoutAt - System.currentTimeMillis(); return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get(); } - catch (InterruptedException | TimeoutException e) { - future.cancel(true); - throw new QueryInterruptedException(e); - } - catch (CancellationException e) { + catch (InterruptedException | TimeoutException | CancellationException e) { + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (ExecutionException e) { + GuavaUtils.cancelAll(true, future, futures); throw new RuntimeException(e.getCause()); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 2de214c4c4eb..6d0563cdf519 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -34,6 +34,7 @@ import org.apache.druid.collections.BlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.Releaser; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; @@ -215,8 +216,7 @@ public CloseableGrouperIterator make() ReferenceCountingResourceHolder.fromCloseable(grouper); resources.register(grouperHolder); - ListenableFuture> futures = Futures.allAsList( - Lists.newArrayList( + List> futures = Lists.newArrayList( Iterables.transform( queryables, new Function, ListenableFuture>() @@ -259,7 +259,7 @@ public AggregateResult call() if (isSingleThreaded) { waitForFutureCompletion( query, - Futures.allAsList(ImmutableList.of(future)), + ImmutableList.of(future), hasTimeout, timeoutAt - System.currentTimeMillis() ); @@ -269,8 +269,7 @@ public AggregateResult call() } } ) - ) - ); + ); if (!isSingleThreaded) { waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis()); @@ -339,11 +338,12 @@ private List> getMergeBuffersHolder( private void waitForFutureCompletion( GroupByQuery query, - ListenableFuture> future, + List> futures, boolean hasTimeout, long timeout ) { + ListenableFuture> future = Futures.allAsList(futures); try { if (queryWatcher != null) { queryWatcher.registerQueryFuture(query, future); @@ -357,25 +357,27 @@ private void waitForFutureCompletion( for (AggregateResult result : results) { if (!result.isOk()) { - future.cancel(true); + GuavaUtils.cancelAll(true, future, futures); throw new ResourceLimitExceededException(result.getReason()); } } } catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); - future.cancel(true); + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (CancellationException e) { + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); - future.cancel(true); + GuavaUtils.cancelAll(true, future, futures); throw new QueryInterruptedException(e); } catch (ExecutionException e) { + GuavaUtils.cancelAll(true, future, futures); throw new RuntimeException(e); } } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 142f361b4e8c..896016c760f8 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -281,4 +281,41 @@ public void testInsufficientResourcesOnBroker() } } } + + @Test(timeout = 60_000L) + public void testTimeoutExceptionOnQueryable() + { + expectedException.expect(QueryInterruptedException.class); + expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class)); + + final GroupByQuery query = GroupByQuery + .builder() + .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) + .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) + .setDimensions(new DefaultDimensionSpec("quality", "alias")) + .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows")) + .setGranularity(QueryRunnerTestHelper.DAY_GRAN) + .overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1)) + .build(); + + GroupByQueryRunnerFactory factory = makeQueryRunnerFactory( + GroupByQueryRunnerTest.DEFAULT_MAPPER, + new GroupByQueryConfig() + { + @Override + public String getDefaultStrategy() + { + return "v2"; + } + + @Override + public boolean isSingleThreaded() + { + return true; + } + } + ); + QueryRunner mergeRunners = factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner)); + GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, query); + } } diff --git a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java index 3b7f30dac676..3c67cb016d47 100644 --- a/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java +++ b/server/src/main/java/org/apache/druid/client/cache/BackgroundCachePopulator.java @@ -27,6 +27,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import org.apache.druid.common.guava.GuavaUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; @@ -100,6 +101,7 @@ public void onSuccess(List results) @Override public void onFailure(Throwable t) { + GuavaUtils.cancelAll(true, null, cacheFutures); log.error(t, "Background caching failed"); } }, diff --git a/server/src/test/java/org/apache/druid/client/cache/BackgroundCachePopulatorTest.java b/server/src/test/java/org/apache/druid/client/cache/BackgroundCachePopulatorTest.java new file mode 100644 index 000000000000..a93d69e16bba --- /dev/null +++ b/server/src/test/java/org/apache/druid/client/cache/BackgroundCachePopulatorTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client.cache; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.client.CacheUtil; +import org.apache.druid.client.CachingClusteredClientTestUtils; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.SequenceWrapper; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.CacheStrategy; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryPlus; +import org.apache.druid.query.QueryRunner; +import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.Result; +import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.LongSumAggregatorFactory; +import org.apache.druid.query.context.ResponseContext; +import org.apache.druid.query.topn.TopNQueryBuilder; +import org.apache.druid.query.topn.TopNQueryConfig; +import org.apache.druid.query.topn.TopNQueryQueryToolChest; +import org.apache.druid.query.topn.TopNResultValue; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import java.io.Closeable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +public class BackgroundCachePopulatorTest +{ + private static final ObjectMapper JSON_MAPPER = CachingClusteredClientTestUtils.createObjectMapper(); + private static final Object[] OBJECTS = new Object[]{ + DateTimes.of("2011-01-05"), "a", 50, 4994, "b", 50, 4993, "c", 50, 4992, + DateTimes.of("2011-01-06"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + DateTimes.of("2011-01-07"), "a", 50, 4991, "b", 50, 4990, "c", 50, 4989, + DateTimes.of("2011-01-08"), "a", 50, 4988, "b", 50, 4987, "c", 50, 4986, + DateTimes.of("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 + }; + private static final List AGGS = Arrays.asList( + new CountAggregatorFactory("rows"), + new LongSumAggregatorFactory("imps", "imps"), + new LongSumAggregatorFactory("impers", "imps") + ); + private BackgroundCachePopulator backgroundCachePopulator; + private QueryToolChest toolchest; + private Cache cache; + private Query query; + private QueryRunner baseRunner; + private AssertingClosable closable; + + @Before + public void before() + { + this.backgroundCachePopulator = new BackgroundCachePopulator( + Execs.multiThreaded(2, "CachingQueryRunnerTest-%d"), + JSON_MAPPER, + new CachePopulatorStats(), + -1 + ); + + + TopNQueryBuilder builder = new TopNQueryBuilder() + .dataSource("ds") + .dimension("top_dim") + .metric("imps") + .threshold(3) + .intervals("2011-01-05/2011-01-10") + .aggregators(AGGS) + .granularity(Granularities.ALL); + + this.query = builder.build(); + this.toolchest = new TopNQueryQueryToolChest(new TopNQueryConfig()); + List expectedRes = makeTopNResults(false, OBJECTS); + + this.closable = new AssertingClosable(); + final Sequence resultSeq = Sequences.wrap( + Sequences.simple(expectedRes), + new SequenceWrapper() + { + @Override + public void before() + { + Assert.assertFalse(closable.isClosed()); + } + + @Override + public void after(boolean isDone, Throwable thrown) + { + closable.close(); + } + } + ); + this.baseRunner = (queryPlus, responseContext) -> resultSeq; + + this.cache = new Cache() + { + private final ConcurrentMap baseMap = new ConcurrentHashMap<>(); + + @Override + public byte[] get(NamedKey key) + { + return baseMap.get(key); + } + + @Override + public void put(NamedKey key, byte[] value) + { + baseMap.put(key, value); + } + + @Override + public Map getBulk(Iterable keys) + { + return null; + } + + @Override + public void close(String namespace) + { + } + + @Override + public void close() + { + } + + @Override + public CacheStats getStats() + { + return null; + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + } + }; + } + + /** + * + * Method: wrap(final Sequence sequence, final Function cacheFn, final Cache cache, final Cache.NamedKey cacheKey) + * + */ + @Test + public void testWrap() + { + String cacheId = "segment"; + SegmentDescriptor segmentDescriptor = new SegmentDescriptor(Intervals.of("2011/2012"), "version", 0); + + + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( + cacheId, + segmentDescriptor, + cacheStrategy.computeCacheKey(query) + ); + + Sequence res = this.backgroundCachePopulator.wrap(this.baseRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()), + (value) -> cacheStrategy.prepareForSegmentLevelCache().apply(value), cache, cacheKey); + Assert.assertFalse("sequence must not be closed", closable.isClosed()); + Assert.assertNull("cache must be empty", cache.get(cacheKey)); + + List results = res.toList(); + Assert.assertTrue(closable.isClosed()); + List expectedRes = makeTopNResults(false, OBJECTS); + Assert.assertEquals(expectedRes.toString(), results.toString()); + Assert.assertEquals(5, results.size()); + } + + @Test + public void testWrapOnFailure() + { + String cacheId = "segment"; + SegmentDescriptor segmentDescriptor = new SegmentDescriptor(Intervals.of("2011/2012"), "version", 0); + + + CacheStrategy cacheStrategy = toolchest.getCacheStrategy(query); + Cache.NamedKey cacheKey = CacheUtil.computeSegmentCacheKey( + cacheId, + segmentDescriptor, + cacheStrategy.computeCacheKey(query) + ); + + Sequence res = this.backgroundCachePopulator.wrap(this.baseRunner.run(QueryPlus.wrap(query), ResponseContext.createEmpty()), + (value) -> { + throw new RuntimeException("Error"); + }, cache, cacheKey); + Assert.assertFalse("sequence must not be closed", closable.isClosed()); + Assert.assertNull("cache must be empty", cache.get(cacheKey)); + + List results = res.toList(); + Assert.assertTrue(closable.isClosed()); + List expectedRes = makeTopNResults(false, OBJECTS); + Assert.assertEquals(expectedRes.toString(), results.toString()); + Assert.assertEquals(5, results.size()); + } + + + private List makeTopNResults(boolean cachedResults, Object... objects) + { + List retVal = new ArrayList<>(); + int index = 0; + while (index < objects.length) { + DateTime timestamp = (DateTime) objects[index++]; + + List> values = new ArrayList<>(); + while (index < objects.length && !(objects[index] instanceof DateTime)) { + if (objects.length - index < 3) { + throw new ISE( + "expect 3 values for each entry in the top list, had %d values left.", objects.length - index + ); + } + final double imps = ((Number) objects[index + 2]).doubleValue(); + final double rows = ((Number) objects[index + 1]).doubleValue(); + + if (cachedResults) { + values.add( + ImmutableMap.of( + "top_dim", objects[index], + "rows", rows, + "imps", imps, + "impers", imps + ) + ); + } else { + values.add( + ImmutableMap.of( + "top_dim", objects[index], + "rows", rows, + "imps", imps, + "impers", imps, + "avg_imps_per_row", imps / rows + ) + ); + } + index += 3; + } + + retVal.add(new Result<>(timestamp, new TopNResultValue(values))); + } + return retVal; + } + + private static class AssertingClosable implements Closeable + { + + private final AtomicBoolean closed = new AtomicBoolean(false); + + @Override + public void close() + { + Assert.assertFalse(closed.get()); + Assert.assertTrue(closed.compareAndSet(false, true)); + } + + public boolean isClosed() + { + return closed.get(); + } + } + + +}