Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 56 additions & 38 deletions server/src/main/java/io/druid/client/cache/ByteCountingLRUMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,23 @@

package io.druid.client.cache;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.cache.Weigher;
import com.metamx.common.logger.Logger;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;

/**
*/
class ByteCountingLRUMap extends LinkedHashMap<ByteBuffer, byte[]>
*/
class ByteCountingLRUMap
{
private static final Logger log = new Logger(ByteCountingLRUMap.class);

private final com.google.common.cache.Cache<ByteBuffer, byte[]> cache;
private final boolean logEvictions;
private final int logEvictionCount;
private final long sizeInBytes;
Expand All @@ -44,22 +47,56 @@ public ByteCountingLRUMap(
final long sizeInBytes
)
{
this(16, 0, sizeInBytes);
this(16, 1, 0, sizeInBytes);
}

public ByteCountingLRUMap(
final int initialSize,
final int concurrencyLevel,
final int logEvictionCount,
final long sizeInBytes
)
{
super(initialSize, 0.75f, true);
this.logEvictionCount = logEvictionCount;
this.sizeInBytes = sizeInBytes;

logEvictions = logEvictionCount != 0;
numBytes = 0;
evictionCount = 0;

RemovalListener<ByteBuffer, byte[]> listener = new RemovalListener<ByteBuffer, byte[]>()
{
@Override
public void onRemoval(RemovalNotification<ByteBuffer, byte[]> entry)
{
++evictionCount;
if (logEvictions && evictionCount % logEvictionCount == 0) {
log.info(
"Evicting %,dth element. Size[%,d], numBytes[%,d], averageSize[%,d]",
evictionCount,
size(),
numBytes,
numBytes / size()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any guard to prevent size from being 0?

);
}
numBytes -= entry.getKey().remaining() + entry.getValue().length;
}
};
cache = CacheBuilder
.newBuilder()
.initialCapacity(initialSize)
.concurrencyLevel(concurrencyLevel)
.maximumWeight(sizeInBytes)
.removalListener(listener)
.weigher(new Weigher<ByteBuffer, byte[]>()
{
public int weigh(ByteBuffer key, byte[] value)
{
int weigh = key.remaining() + value.length;
numBytes += weigh;
return weigh;
}
}).build();
}

public long getNumBytes()
Expand All @@ -72,58 +109,39 @@ public long getEvictionCount()
return evictionCount;
}

@Override
public byte[] put(ByteBuffer key, byte[] value)
public byte[] get(ByteBuffer key)
{
numBytes += key.remaining() + value.length;
return super.put(key, value);
return cache.getIfPresent(key);
}

@Override
protected boolean removeEldestEntry(Map.Entry<ByteBuffer, byte[]> eldest)
public void put(ByteBuffer key, byte[] value)
{
if (numBytes > sizeInBytes) {
++evictionCount;
if (logEvictions && evictionCount % logEvictionCount == 0) {
log.info(
"Evicting %,dth element. Size[%,d], numBytes[%,d], averageSize[%,d]",
evictionCount,
size(),
numBytes,
numBytes / size()
);
}

numBytes -= eldest.getKey().remaining() + eldest.getValue().length;
return true;
}
return false;
cache.put(key, value);
}

@Override
public byte[] remove(Object key)
{
byte[] value = super.remove(key);
if(value != null) {
numBytes -= ((ByteBuffer)key).remaining() + value.length;
}
byte[] value = cache.getIfPresent(key);
cache.invalidate(key);
return value;
}

/**
* Don't allow key removal using the underlying keySet iterator
* All removal operations must use ByteCountingLRUMap.remove()
*/
@Override
public Set<ByteBuffer> keySet()
{
return Collections.unmodifiableSet(super.keySet());
return Collections.unmodifiableSet(cache.asMap().keySet());
}

@Override
public void clear()
{
numBytes = 0;
super.clear();
cache.invalidateAll();
}

public long size()
{
return cache.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,17 @@ public class LocalCacheProvider implements CacheProvider
@Min(0)
private int initialSize = 500000;

@JsonProperty
@Min(0)
private int concurrencyLevel = 4;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this default to the number of processing threads?


@JsonProperty
@Min(0)
private int logEvictionCount = 0;

@Override
public Cache get()
{
return new MapCache(new ByteCountingLRUMap(initialSize, logEvictionCount, sizeInBytes));
return new MapCache(new ByteCountingLRUMap(initialSize, concurrencyLevel, logEvictionCount, sizeInBytes));
}
}
11 changes: 4 additions & 7 deletions server/src/main/java/io/druid/client/cache/MapCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.metamx.emitter.service.ServiceEmitter;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -41,7 +40,6 @@ public static Cache create(long sizeInBytes)
return new MapCache(new ByteCountingLRUMap(sizeInBytes));
}

private final Map<ByteBuffer, byte[]> baseMap;
private final ByteCountingLRUMap byteCountingLRUMap;

private final Map<String, byte[]> namespaceId;
Expand All @@ -57,7 +55,6 @@ public static Cache create(long sizeInBytes)
)
{
this.byteCountingLRUMap = byteCountingLRUMap;
this.baseMap = Collections.synchronizedMap(byteCountingLRUMap);

namespaceId = Maps.newHashMap();
ids = new AtomicInteger();
Expand All @@ -82,7 +79,7 @@ public byte[] get(NamedKey key)
{
final byte[] retVal;
synchronized (clearLock) {
retVal = baseMap.get(computeKey(getNamespaceId(key.namespace), key.key));
retVal = byteCountingLRUMap.get(computeKey(getNamespaceId(key.namespace), key.key));
}
if (retVal == null) {
missCount.incrementAndGet();
Expand All @@ -96,7 +93,7 @@ public byte[] get(NamedKey key)
public void put(NamedKey key, byte[] value)
{
synchronized (clearLock) {
baseMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
byteCountingLRUMap.put(computeKey(getNamespaceId(key.namespace), key.key), value);
}
}

Expand Down Expand Up @@ -126,7 +123,7 @@ public void close(String namespace)
namespaceId.remove(namespace);
}
synchronized (clearLock) {
Iterator<ByteBuffer> iter = baseMap.keySet().iterator();
Iterator<ByteBuffer> iter = byteCountingLRUMap.keySet().iterator();
List<ByteBuffer> toRemove = Lists.newLinkedList();
while (iter.hasNext()) {
ByteBuffer next = iter.next();
Expand All @@ -139,7 +136,7 @@ public void close(String namespace)
}
}
for (ByteBuffer key : toRemove) {
baseMap.remove(key);
byteCountingLRUMap.remove(key);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ public void testSanity() throws Exception
Assert.assertEquals(ByteBuffer.wrap(eightyEightVal), ByteBuffer.wrap(map.get(tenKey)));

map.put(twoByte, oneByte.array());
assertMapValues(2, 101, 2);
Assert.assertEquals(ByteBuffer.wrap(eightyEightVal), ByteBuffer.wrap(map.get(tenKey)));
assertMapValues(1, 3, 3);
Assert.assertNull(map.get(tenKey));
Assert.assertEquals(oneByte, ByteBuffer.wrap(map.get(twoByte)));

map.put(tenKey, twoByte.array());
assertMapValues(2, 15, 3);
Assert.assertEquals(twoByte, ByteBuffer.wrap(map.get(tenKey)));
Assert.assertEquals(oneByte, ByteBuffer.wrap(map.get(twoByte)));

Iterator<ByteBuffer> it = map.keySet().iterator();
Expand All @@ -80,10 +85,10 @@ public void testSanity() throws Exception
for(ByteBuffer buf : toRemove) {
map.remove(buf);
}
assertMapValues(1, 3, 2);
assertMapValues(1, 3, 4);

map.remove(twoByte);
assertMapValues(0, 0, 2);
assertMapValues(0, 0, 5);
}

private void assertMapValues(final int size, final int numBytes, final int evictionCount)
Expand Down