diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 33f875b770f6..8ae49c71c7ba 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -29,7 +29,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - +import com.google.common.util.concurrent.SettableFuture; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.java.util.common.guava.BaseSequence; @@ -47,7 +47,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -152,7 +151,6 @@ public void cleanup(Iterator iterFromMake) final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { final Function cacheFn = strategy.prepareForCache(); - final List cacheResults = Lists.newLinkedList(); return Sequences.withEffect( Sequences.map( @@ -162,17 +160,23 @@ public void cleanup(Iterator iterFromMake) @Override public T apply(final T input) { - cacheFutures.add( - backgroundExecutorService.submit( - new Runnable() - { - @Override - public void run() - { - cacheResults.add(cacheFn.apply(input)); - } + final SettableFuture future = SettableFuture.create(); + cacheFutures.add(future); + backgroundExecutorService.submit( + new Runnable() + { + @Override + public void run() + { + try { + future.set(cacheFn.apply(input)); + } + catch (Exception e) { + // if there is exception, should setException to quit the caching processing + future.setException(e); } - ) + } + } ); return input; } @@ -184,8 +188,7 @@ public void run() public void run() { try { - Futures.allAsList(cacheFutures).get(); - CacheUtil.populate(cache, mapper, key, cacheResults); + CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get()); } catch (Exception e) { log.error(e, "Error while getting future for cache task"); diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 5b5b53695c2b..e5b3ecd94e61 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -25,8 +25,10 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CacheStats; import io.druid.client.cache.MapCache; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; @@ -56,6 +58,8 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.Closeable; import java.io.IOException; @@ -64,10 +68,21 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +@RunWith(Parameterized.class) public class CachingQueryRunnerTest { + @Parameterized.Parameters(name = "numBackgroundThreads={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0)); + } private static final List AGGS = Arrays.asList( new CountAggregatorFactory("rows"), @@ -83,6 +98,17 @@ public class CachingQueryRunnerTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 }; + private ExecutorService backgroundExecutorService; + + public CachingQueryRunnerTest(int numBackgroundThreads) + { + if (numBackgroundThreads > 0) { + backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads); + } else { + backgroundExecutorService = MoreExecutors.sameThreadExecutor(); + } + } + @Test public void testCloseAndPopulate() throws Exception { @@ -183,7 +209,52 @@ public void after(boolean isDone, Throwable thrown) throws Exception } ); - Cache cache = MapCache.create(1024 * 1024); + final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1); + Cache cache = new Cache() + { + private final Map baseMap = new ConcurrentHashMap<>(); + + @Override + public byte[] get(NamedKey key) + { + return baseMap.get(key); + } + + @Override + public void put(NamedKey key, byte[] value) + { + baseMap.put(key, value); + cacheMustBePutOnce.countDown(); + } + + @Override + public Map getBulk(Iterable keys) + { + return null; + } + + @Override + public void close(String namespace) + { + } + + @Override + public CacheStats getStats() + { + return null; + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + } + }; String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); @@ -203,7 +274,7 @@ public Sequence run(Query query, Map responseContext) return resultSeq; } }, - MoreExecutors.sameThreadExecutor(), + backgroundExecutorService, new CacheConfig() { @Override @@ -237,6 +308,9 @@ public boolean isUseCache() Assert.assertTrue(closable.isClosed()); Assert.assertEquals(expectedRes.toString(), results.toString()); + // wait for background caching finish + // wait at most 10 seconds to fail the test to avoid block overall tests + Assert.assertTrue("cache must be populated", cacheMustBePutOnce.await(10, TimeUnit.SECONDS)); byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); @@ -293,7 +367,7 @@ public Sequence run(Query query, Map responseContext) return Sequences.empty(); } }, - MoreExecutors.sameThreadExecutor(), + backgroundExecutorService, new CacheConfig() { @Override