Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions server/src/main/java/io/druid/client/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,30 +30,30 @@
*/
public interface Cache
{
public byte[] get(NamedKey key);
public void put(NamedKey key, byte[] value);
byte[] get(NamedKey key);
void put(NamedKey key, byte[] value);

/**
* Resulting map should not contain any null values (i.e. cache misses should not be included)
*
* @param keys
* @return
*/
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);
Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys);

public void close(String namespace);
void close(String namespace);

public CacheStats getStats();
CacheStats getStats();

public boolean isLocal();
boolean isLocal();

/**
* Custom metrics not covered by CacheStats may be emitted by this method.
* @param emitter The service emitter to emit on.
*/
public void doMonitor(ServiceEmitter emitter);
void doMonitor(ServiceEmitter emitter);

public class NamedKey
class NamedKey
{
final public String namespace;
final public byte[] key;
Expand Down
62 changes: 58 additions & 4 deletions server/src/main/java/io/druid/client/cache/LocalCacheProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.query.DruidProcessingConfig;

import javax.validation.constraints.Min;
import java.util.Map;

/**
*/
public class LocalCacheProvider implements CacheProvider
{
public static final NoopCache NOOP_CACHE = new NoopCache();

@JsonProperty
@Min(0)
private long sizeInBytes = 0;
Expand All @@ -33,13 +40,60 @@ public class LocalCacheProvider implements CacheProvider
@Min(0)
private int initialSize = 500000;

@JsonProperty
@Min(0)
private int logEvictionCount = 0;
@JacksonInject
private DruidProcessingConfig config = null;

@Override
public Cache get()
{
return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes));
if(sizeInBytes > 0) {
return MapCache.create(sizeInBytes, initialSize, config.getNumThreads());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you set the concurrency to the number of threads, the likelihood of having a collision in the striped locking is pretty high if they happen to each request a random lock. Can we make the concurrency higher than the number of threads?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I've read, for most use cases, setting a concurrency level to the number of threads should be enough, or do you have examples that show otherwise?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as per #1849 (comment) only in java 7 and earlier.

} else {
return NOOP_CACHE;
}
}

private static class NoopCache implements Cache
{
@Override
public byte[] get(NamedKey key)
{
return null;
}

@Override
public void put(NamedKey key, byte[] value)
{
}

@Override
public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
{
return ImmutableMap.of();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be a static ImmutableMap?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ImmutableMap.of() does return a static

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh nice

}

@Override
public void close(String namespace)
{

}

@Override
public CacheStats getStats()
{
return new CacheStats(0,0,0,0,0,0,0);
}

@Override
public boolean isLocal()
{
return true;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{

}
}
}
139 changes: 60 additions & 79 deletions server/src/main/java/io/druid/client/cache/MapCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,59 +17,77 @@

package io.druid.client.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.Weigher;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.emitter.service.ServiceEmitter;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
public class MapCache implements Cache
{
public static Cache create(long sizeInBytes)
public static MapCache create(long sizeInBytes) {
return create(sizeInBytes, 0, 4);
}
public static MapCache create(long sizeInBytes, int initialCapacity, int concurrencyLevel)
{
return new MapCache(new ByteCountingLRUMap(sizeInBytes));
return new MapCache(sizeInBytes, initialCapacity, concurrencyLevel);
}

private final Map<ByteBuffer, byte[]> baseMap;
private final ByteCountingLRUMap byteCountingLRUMap;

private final Map<String, byte[]> namespaceId;
private final AtomicInteger ids;

private final Object clearLock = new Object();
private final com.google.common.cache.Cache<ByteBuffer, byte[]> baseCache;
private final com.google.common.cache.LoadingCache<String, Integer> namespaceId;
private final AtomicInteger ids = new AtomicInteger();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're doing increments can this start at Integer.MIN_VALUE?


private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);

MapCache(
ByteCountingLRUMap byteCountingLRUMap
)
private MapCache(long sizeInBytes, int initialCapacity, int concurrencyLevel)
{
this.byteCountingLRUMap = byteCountingLRUMap;
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);

namespaceId = Maps.newHashMap();
ids = new AtomicInteger();
this.baseCache = CacheBuilder
.newBuilder()
.initialCapacity(initialCapacity)
.maximumWeight(sizeInBytes)
.recordStats()
.concurrencyLevel(concurrencyLevel)
.weigher(
new Weigher<ByteBuffer, byte[]>()
{
@Override
public int weigh(ByteBuffer key, byte[] value)
{
return key.remaining() + value.length;
}
}
)
.build();

this.namespaceId = CacheBuilder
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if my understanding is correct, there is no eviction on this cache, if so, i don't see why not using a simple concurrent hash map ?

.newBuilder()
.concurrencyLevel(concurrencyLevel)
.build(new CacheLoader<String, Integer>()
{
@Override
public Integer load(String namespace) throws Exception
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no checked exception, i think you don't need throws Exception

{
return ids.getAndIncrement();
}
});
}

@Override
public CacheStats getStats()
{
final com.google.common.cache.CacheStats stats = baseCache.stats();
return new CacheStats(
hitCount.get(),
missCount.get(),
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount(),
stats.hitCount(),
stats.missCount(),
baseCache.size(),
-1,
stats.evictionCount(),
0,
0
);
Expand All @@ -78,24 +96,13 @@ public CacheStats getStats()
@Override
public byte[] get(NamedKey key)
{
final byte[] retVal;
synchronized (clearLock) {
retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key));
}
if (retVal == null) {
missCount.incrementAndGet();
} else {
hitCount.incrementAndGet();
}
return retVal;
return baseCache.getIfPresent(computeKey(namespaceId.getUnchecked(key.namespace), key.key));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a race here between get and close? whereby two threads may each execute get xor close and leave namespaceId polluted with a namespace that is assumed closed? (it is not clear that is not present in the existing impl)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getIfPresent can return null it is a good habit to use the annotation @Nullable.

}

@Override
public void put(NamedKey key, byte[] value)
{
synchronized (clearLock) {
baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
}
baseCache.put(computeKey(namespaceId.getUnchecked(key.namespace), key.key), value);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prevent race conditions on namespaceId, the namespaceId can be stored, then the asMap() representation of namespaceId can be used to check and see if the namespaceId after baseCache.put is the same as when get was first called. if it is, then things are safe, if it is not, then the cache just populated should be removed.

}

@Override
Expand All @@ -114,51 +121,25 @@ public Map<NamedKey, byte[]> getBulk(Iterable<NamedKey> keys)
@Override
public void close(String namespace)
{
byte[] idBytes;
synchronized (namespaceId) {
idBytes = getNamespaceId(namespace);
if (idBytes == null) {
return;
}

namespaceId.remove(namespace);
}
synchronized (clearLock) {
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
final Integer id = namespaceId.asMap().remove(namespace);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior impl had a guard against races between put/get and close.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of just having things expire in the namespaceId cache as well.
I don't think it's worth adding back lock contention on close.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would have the potential to waste memory temporarily, since it would auto-recover after a time I agree its probably fine to just let the LRU get rid of it over time.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call will return an entire map, i think you can use getIfPresent then invalidate(key) an avoid building the map every time you have to clean after a namespace

if (id != null) {
List<ByteBuffer> toRemove = Lists.newLinkedList();
while (iter.hasNext()) {
ByteBuffer next = iter.next();

if (next.get(0) == idBytes[0]
&& next.get(1) == idBytes[1]
&& next.get(2) == idBytes[2]
&& next.get(3) == idBytes[3]) {
final int unboxed = id;
for (ByteBuffer next : baseCache.asMap().keySet()) {
if (next.getInt(0) == unboxed) {
toRemove.add(next);
}
}
for (ByteBuffer key : toRemove) {
baseMap.remove(key);
}
}
}

private byte[] getNamespaceId(final String identifier)
{
synchronized (namespaceId) {
byte[] idBytes = namespaceId.get(identifier);
if (idBytes != null) {
return idBytes;
}

idBytes = Ints.toByteArray(ids.getAndIncrement());
namespaceId.put(identifier, idBytes);
return idBytes;
baseCache.invalidateAll(toRemove);
}
}

private ByteBuffer computeKey(byte[] idBytes, byte[] key)
private ByteBuffer computeKey(int id, byte[] key)
{
final ByteBuffer retVal = ByteBuffer.allocate(key.length + 4).put(idBytes).put(key);
final ByteBuffer retVal = ByteBuffer
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're doing putInt we need to specify the byte order.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's big-endian by default

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found the docs, sounds good

.allocate(key.length + 4)
.putInt(id)
.put(key);
retVal.rewind();
return retVal;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public void configure(Binder binder)
@Test
public void testSanity() throws Exception
{
final MapCache l1 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
final MapCache l2 = new MapCache(new ByteCountingLRUMap(1024 * 1024));
final MapCache l1 = MapCache.create(1024 * 1024);
final MapCache l2 = MapCache.create(1024 * 1024);
HybridCache cache = new HybridCache(l1, l2);

final Cache.NamedKey key1 = new Cache.NamedKey("a", HI);
Expand Down
16 changes: 7 additions & 9 deletions server/src/test/java/io/druid/client/cache/MapCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,45 +28,43 @@ public class MapCacheTest
{
private static final byte[] HI = "hi".getBytes();
private static final byte[] HO = "ho".getBytes();
private ByteCountingLRUMap baseMap;
private MapCache cache;

@Before
public void setUp() throws Exception
{
baseMap = new ByteCountingLRUMap(1024 * 1024);
cache = new MapCache(baseMap);
cache = MapCache.create(1024 * 1024);
}

@Test
public void testSanity() throws Exception
{
Assert.assertNull(cache.get(new Cache.NamedKey("a", HI)));
Assert.assertEquals(0, baseMap.size());
Assert.assertEquals(0, cache.getStats().getNumEntries());
put(cache, "a", HI, 1);
Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("the", HI)));

put(cache, "the", HI, 2);
Assert.assertEquals(2, baseMap.size());
Assert.assertEquals(2, cache.getStats().getNumEntries());
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertEquals(2, get(cache, "the", HI));

put(cache, "the", HO, 10);
Assert.assertEquals(3, baseMap.size());
Assert.assertEquals(3, cache.getStats().getNumEntries());
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));
Assert.assertEquals(2, get(cache, "the", HI));
Assert.assertEquals(10, get(cache, "the", HO));

cache.close("the");
Assert.assertEquals(1, baseMap.size());
Assert.assertEquals(1, cache.getStats().getNumEntries());
Assert.assertEquals(1, get(cache, "a", HI));
Assert.assertNull(cache.get(new Cache.NamedKey("a", HO)));

cache.close("a");
Assert.assertEquals(0, baseMap.size());
Assert.assertEquals(0, cache.getStats().getNumEntries());
}

public void put(Cache cache, String namespace, byte[] key, Integer value)
Expand Down