Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions server/src/main/java/io/druid/client/CachingQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import com.google.common.util.concurrent.SettableFuture;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.java.util.common.guava.BaseSequence;
Expand All @@ -47,7 +47,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -152,7 +151,6 @@ public void cleanup(Iterator<T> iterFromMake)
final Collection<ListenableFuture<?>> cacheFutures = Collections.synchronizedList(Lists.<ListenableFuture<?>>newLinkedList());
if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final List<Object> cacheResults = Lists.newLinkedList();

return Sequences.withEffect(
Sequences.map(
Expand All @@ -162,17 +160,23 @@ public void cleanup(Iterator<T> iterFromMake)
@Override
public T apply(final T input)
{
cacheFutures.add(
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
cacheResults.add(cacheFn.apply(input));
}
final SettableFuture<Object> future = SettableFuture.create();
cacheFutures.add(future);
backgroundExecutorService.submit(
new Runnable()
{
@Override
public void run()
{
try {
future.set(cacheFn.apply(input));
}
catch (Exception e) {
// if there is exception, should setException to quit the caching processing
future.setException(e);
}
)
}
}
);
return input;
}
Expand All @@ -184,8 +188,7 @@ public void run()
public void run()
{
try {
Futures.allAsList(cacheFutures).get();
CacheUtil.populate(cache, mapper, key, cacheResults);
CacheUtil.populate(cache, mapper, key, Futures.allAsList(cacheFutures).get());
}
catch (Exception e) {
log.error(e, "Error while getting future for cache task");
Expand Down
80 changes: 77 additions & 3 deletions server/src/test/java/io/druid/client/CachingQueryRunnerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.CacheStats;
import io.druid.client.cache.MapCache;
import io.druid.granularity.QueryGranularities;
import io.druid.jackson.DefaultObjectMapper;
Expand Down Expand Up @@ -56,6 +58,8 @@
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -64,10 +68,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@RunWith(Parameterized.class)
public class CachingQueryRunnerTest
{
@Parameterized.Parameters(name = "numBackgroundThreads={0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
{
return QueryRunnerTestHelper.cartesian(Arrays.asList(5, 1, 0));
}

private static final List<AggregatorFactory> AGGS = Arrays.asList(
new CountAggregatorFactory("rows"),
Expand All @@ -83,6 +98,17 @@ public class CachingQueryRunnerTest
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
};

private ExecutorService backgroundExecutorService;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a new line after this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modefied, @KurtYoung


public CachingQueryRunnerTest(int numBackgroundThreads)
{
if (numBackgroundThreads > 0) {
backgroundExecutorService = Executors.newFixedThreadPool(numBackgroundThreads);
} else {
backgroundExecutorService = MoreExecutors.sameThreadExecutor();
}
}

@Test
public void testCloseAndPopulate() throws Exception
{
Expand Down Expand Up @@ -183,7 +209,52 @@ public void after(boolean isDone, Throwable thrown) throws Exception
}
);

Cache cache = MapCache.create(1024 * 1024);
final CountDownLatch cacheMustBePutOnce = new CountDownLatch(1);
Cache cache = new Cache()
{
private final Map<NamedKey, byte[]> baseMap = new ConcurrentHashMap<>();

@Override
public byte[] get(NamedKey key)
{
return baseMap.get(key);
}

@Override
public void put(NamedKey key, byte[] value)
{
baseMap.put(key, value);
cacheMustBePutOnce.countDown();
}

@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
return null;
}

@Override
public void close(String namespace)
{
}

@Override
public CacheStats getStats()
{
return null;
}

@Override
public boolean isLocal()
{
return true;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
}
};

String segmentIdentifier = "segment";
SegmentDescriptor segmentDescriptor = new SegmentDescriptor(new Interval("2011/2012"), "version", 0);
Expand All @@ -203,7 +274,7 @@ public Sequence run(Query query, Map responseContext)
return resultSeq;
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override
Expand Down Expand Up @@ -237,6 +308,9 @@ public boolean isUseCache()
Assert.assertTrue(closable.isClosed());
Assert.assertEquals(expectedRes.toString(), results.toString());

// wait for background caching finish
// wait at most 10 seconds to fail the test to avoid block overall tests
Assert.assertTrue("cache must be populated", cacheMustBePutOnce.await(10, TimeUnit.SECONDS));
byte[] cacheValue = cache.get(cacheKey);
Assert.assertNotNull(cacheValue);

Expand Down Expand Up @@ -293,7 +367,7 @@ public Sequence run(Query query, Map responseContext)
return Sequences.empty();
}
},
MoreExecutors.sameThreadExecutor(),
backgroundExecutorService,
new CacheConfig()
{
@Override
Expand Down