From d011d91fb73af90ee4a6b8ed119db69b777e2983 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Thu, 16 Feb 2017 21:31:34 +0800 Subject: [PATCH 1/3] fix cache populate incorrect content when numBackgroundThreads>1 --- .../io/druid/client/CachingQueryRunner.java | 50 +++++++++++++------ .../druid/client/CachingQueryRunnerTest.java | 26 +++++++++- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 33f875b770f6..838137fc2d90 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -23,13 +23,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - +import com.google.common.util.concurrent.SettableFuture; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; import io.druid.java.util.common.guava.BaseSequence; @@ -47,7 +47,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -152,7 +151,6 @@ public void cleanup(Iterator iterFromMake) final Collection> cacheFutures = Collections.synchronizedList(Lists.>newLinkedList()); if (populateCache) { final Function cacheFn = strategy.prepareForCache(); - final List cacheResults = Lists.newLinkedList(); return Sequences.withEffect( Sequences.map( @@ -162,17 +160,23 @@ public void cleanup(Iterator iterFromMake) @Override public T apply(final T input) { - cacheFutures.add( - backgroundExecutorService.submit( - new Runnable() - { - @Override - public void run() - { - cacheResults.add(cacheFn.apply(input)); - } + final SettableFuture future = SettableFuture.create(); + cacheFutures.add(future); + backgroundExecutorService.submit( + new Runnable() + { + @Override + public void run() + { + try { + future.set(cacheFn.apply(input)); + } + catch (Exception e) { + // if there is exception, should setException to quit the caching processing + future.setException(e); } - ) + } + } ); return input; } @@ -184,8 +188,22 @@ public void run() public void run() { try { - Futures.allAsList(cacheFutures).get(); - CacheUtil.populate(cache, mapper, key, cacheResults); + CacheUtil.populate(cache, mapper, key, Iterables.transform( + cacheFutures, + new Function, Object>() + { + @Override + public Object apply(ListenableFuture input) + { + try { + return input.get(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + )); } catch (Exception e) { log.error(e, "Error while getting future for cache task"); diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index 5b5b53695c2b..b0aa50a500d0 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -56,6 +56,8 @@ import org.joda.time.Interval; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.Closeable; import java.io.IOException; @@ -64,10 +66,18 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +@RunWith(Parameterized.class) public class CachingQueryRunnerTest { + @Parameterized.Parameters(name="numBackgroundThreads={0}") + public static Iterable constructorFeeder() throws IOException + { + return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0)); + } private static final List AGGS = Arrays.asList( new CountAggregatorFactory("rows"), @@ -83,6 +93,16 @@ public class CachingQueryRunnerTest new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 }; + private ExecutorService backgroundExecutorService; + public CachingQueryRunnerTest(int numBackgroundThreads) + { + if (numBackgroundThreads > 0) { + backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads); + } else { + backgroundExecutorService = MoreExecutors.sameThreadExecutor(); + } + } + @Test public void testCloseAndPopulate() throws Exception { @@ -203,7 +223,7 @@ public Sequence run(Query query, Map responseContext) return resultSeq; } }, - MoreExecutors.sameThreadExecutor(), + backgroundExecutorService, new CacheConfig() { @Override @@ -237,6 +257,8 @@ public boolean isUseCache() Assert.assertTrue(closable.isClosed()); Assert.assertEquals(expectedRes.toString(), results.toString()); + // wait for background caching finish + Thread.sleep(500); byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); @@ -293,7 +315,7 @@ public Sequence run(Query query, Map responseContext) return Sequences.empty(); } }, - MoreExecutors.sameThreadExecutor(), + backgroundExecutorService, new CacheConfig() { @Override From 55a9f04b7ecec54a53340e87cb6d995af5a85b22 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Fri, 17 Feb 2017 10:31:24 +0800 Subject: [PATCH 2/3] simplify code by using Futures.allAsList and use CountDownLatch in UT --- .../io/druid/client/CachingQueryRunner.java | 19 +------ .../druid/client/CachingQueryRunnerTest.java | 54 ++++++++++++++++++- 2 files changed, 54 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/io/druid/client/CachingQueryRunner.java b/server/src/main/java/io/druid/client/CachingQueryRunner.java index 838137fc2d90..8ae49c71c7ba 100644 --- a/server/src/main/java/io/druid/client/CachingQueryRunner.java +++ b/server/src/main/java/io/druid/client/CachingQueryRunner.java @@ -23,9 +23,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Throwables; -import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -188,22 +188,7 @@ public void run() public void run() { try { - CacheUtil.populate(cache, mapper, key, Iterables.transform( - cacheFutures, - new Function, Object>() - { - @Override - public Object apply(ListenableFuture input) - { - try { - return input.get(); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - } - )); + CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get()); } catch (Exception e) { log.error(e, "Error while getting future for cache task"); diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index b0aa50a500d0..f14da3b890de 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -25,8 +25,10 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.cache.Cache; import io.druid.client.cache.CacheConfig; +import io.druid.client.cache.CacheStats; import io.druid.client.cache.MapCache; import io.druid.granularity.QueryGranularities; import io.druid.jackson.DefaultObjectMapper; @@ -66,8 +68,11 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @RunWith(Parameterized.class) @@ -203,7 +208,51 @@ public void after(boolean isDone, Throwable thrown) throws Exception } ); - Cache cache = MapCache.create(1024 * 1024); + final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1); + Cache cache = new Cache() { + private final Map baseMap = new ConcurrentHashMap<>(); + + @Override + public byte[] get(NamedKey key) + { + return baseMap.get(key); + } + + @Override + public void put(NamedKey key, byte[] value) + { + baseMap.put(key, value); + cacheMustBePutOnce.countDown(); + } + + @Override + public Map getBulk(Iterable keys) + { + return null; + } + + @Override + public void close(String namespace) + { + } + + @Override + public CacheStats getStats() + { + return null; + } + + @Override + public boolean isLocal() + { + return true; + } + + @Override + public void doMonitor(ServiceEmitter emitter) + { + } + }; String segmentIdentifier = "segment"; SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0); @@ -258,7 +307,8 @@ public boolean isUseCache() Assert.assertEquals(expectedRes.toString(), results.toString()); // wait for background caching finish - Thread.sleep(500); + // wait at most 10 seconds to fail the test to avoid block overall tests + cacheMustBePutOnce.await(10, TimeUnit.SECONDS); byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue); From d2d053efe86f84d4f09517726bb9b77147a23388 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Fri, 17 Feb 2017 11:17:10 +0800 Subject: [PATCH 3/3] fix test code style and assert countDownLatch.await() --- .../test/java/io/druid/client/CachingQueryRunnerTest.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java index f14da3b890de..e5b3ecd94e61 100644 --- a/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java +++ b/server/src/test/java/io/druid/client/CachingQueryRunnerTest.java @@ -78,7 +78,7 @@ @RunWith(Parameterized.class) public class CachingQueryRunnerTest { - @Parameterized.Parameters(name="numBackgroundThreads={0}") + @Parameterized.Parameters(name = "numBackgroundThreads={0}") public static Iterable constructorFeeder() throws IOException { return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0)); @@ -99,6 +99,7 @@ public static Iterable constructorFeeder() throws IOException }; private ExecutorService backgroundExecutorService; + public CachingQueryRunnerTest(int numBackgroundThreads) { if (numBackgroundThreads > 0) { @@ -209,7 +210,8 @@ public void after(boolean isDone, Throwable thrown) throws Exception ); final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1); - Cache cache = new Cache() { + Cache cache = new Cache() + { private final Map baseMap = new ConcurrentHashMap<>(); @Override @@ -308,7 +310,7 @@ public boolean isUseCache() // wait for background caching finish // wait at most 10 seconds to fail the test to avoid block overall tests - cacheMustBePutOnce.await(10, TimeUnit.SECONDS); + Assert.assertTrue("cache must be populated", cacheMustBePutOnce.await(10, TimeUnit.SECONDS)); byte[] cacheValue = cache.get(cacheKey); Assert.assertNotNull(cacheValue);