Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public Sequence<T> run(QueryPlus queryPlus, ResponseContext responseContext)

if (useResultCache && newResultSetId != null && newResultSetId.equals(existingResultSetId)) {
log.debug("Return cached result set as there is no change in identifiers for query %s ", query.getId());
// Call accumulate on the sequence to ensure that all Wrapper/Closer/Baggage/etc. get called
resultFromClient.accumulate(null, (accumulated, in) -> accumulated);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its quite unfortunate that the seq needs to be created to get newResultSetId as that is passed thru the responseContext.

actually the returned seq is an empty seq in cases when the cache supposed to be used.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know.
The newResultSetId (ETag) is computed in CachingClusteredClient, which is many many layers down from ResultLevelCachingQueryRunner. It's not simple to compute that at this layer due to needing timeline, segmentServers, etc.
Also, the resultFromClient (Sequence) doesn't have a simple close() method so I end up having to call the accumulate. The Sequence is Mapped, Wrapped, etc. making it hard to access the resources/closers
But the Sequence retruend from CachingClusteredClient when the ETag matches (ResultLevelCache hits) is just an empty Sequence so this call should not be expensive.

return deserializeResults(cachedResultSet, strategy, existingResultSetId);
} else {
@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,30 @@
import org.apache.druid.client.cache.Cache;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.MapCache;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.apache.druid.timeline.DataSegment;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;

import static org.junit.Assert.fail;

public class ResultLevelCachingQueryRunnerTest extends QueryRunnerBasedOnClusteredClientTestBase
{
private Cache cache;
Expand Down Expand Up @@ -252,7 +263,7 @@ public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache()
);
try {
sequence.toList();
Assert.fail("Expected to throw an exception");
fail("Expected to throw an exception");
}
catch (RuntimeException e) {
Assert.assertEquals("Exception for testing", e.getMessage());
Expand All @@ -264,6 +275,71 @@ public void testPopulateCacheWhenQueryThrowExceptionShouldNotCache()
}
}

@Test
public void testUseCacheAndReleaseResourceFromClient()
{
final BlockingPool<ByteBuffer> mergePool = new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1), 1);
prepareCluster(10);
final Query<Result<TimeseriesResultValue>> query = timeseriesQuery(BASE_SCHEMA_INFO.getDataInterval());
CacheConfig cacheConfig = newCacheConfig(true, true, DEFAULT_CACHE_ENTRY_MAX_SIZE);
final QueryRunner<Result<TimeseriesResultValue>> baseRunner = cachingClusteredClient.getQueryRunnerForIntervals(query, query.getIntervals());
RetryQueryRunner<Result<TimeseriesResultValue>> spyRunner = Mockito.spy(new RetryQueryRunner<>(
baseRunner,
cachingClusteredClient::getQueryRunnerForSegments,
new RetryQueryRunnerConfig(),
objectMapper
));
Mockito.doAnswer((Answer<Object>) invocation -> {
List<ReferenceCountingResourceHolder<ByteBuffer>> resoruce = mergePool.takeBatch(1, 1);
if (resoruce.isEmpty()) {
fail("Resource should not be empty");
}
Sequence<Result<TimeseriesResultValue>> realSequence = (Sequence<Result<TimeseriesResultValue>>) invocation.callRealMethod();
Closer closer = Closer.create();
closer.register(() -> resoruce.forEach(ReferenceCountingResourceHolder::close));
return Sequences.withBaggage(realSequence, closer);
}).when(spyRunner).run(ArgumentMatchers.any(), ArgumentMatchers.any());

final ResultLevelCachingQueryRunner<Result<TimeseriesResultValue>> queryRunner1 = new ResultLevelCachingQueryRunner<>(
spyRunner,
conglomerate.getToolChest(query),
query,
objectMapper,
cache,
cacheConfig
);

final Sequence<Result<TimeseriesResultValue>> sequence1 = queryRunner1.run(
QueryPlus.wrap(query),
responseContext()
);
final List<Result<TimeseriesResultValue>> results1 = sequence1.toList();
Assert.assertEquals(0, cache.getStats().getNumHits());
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(1, cache.getStats().getNumMisses());


final Sequence<Result<TimeseriesResultValue>> sequence2 = queryRunner1.run(
QueryPlus.wrap(query),
responseContext()
);
final List<Result<TimeseriesResultValue>> results2 = sequence2.toList();
Assert.assertEquals(results1, results2);
Assert.assertEquals(1, cache.getStats().getNumHits());
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(1, cache.getStats().getNumMisses());

final Sequence<Result<TimeseriesResultValue>> sequence3 = queryRunner1.run(
QueryPlus.wrap(query),
responseContext()
);
Comment thread Fixed
final List<Result<TimeseriesResultValue>> results3 = sequence3.toList();
Assert.assertEquals(results1, results3);
Assert.assertEquals(2, cache.getStats().getNumHits());
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(1, cache.getStats().getNumMisses());
}

private <T> ResultLevelCachingQueryRunner<T> createQueryRunner(
CacheConfig cacheConfig,
Query<T> query
Expand Down
Loading