Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,6 @@ private void emitStats(
emitter.emit(builder.build(String.format("%s/hitRate", metricPrefix), cacheStats.hitRate()));
emitter.emit(builder.build(String.format("%s/averageBytes", metricPrefix), cacheStats.averageBytes()));
emitter.emit(builder.build(String.format("%s/timeouts", metricPrefix), cacheStats.getNumTimeouts()));
emitter.emit(builder.build(String.format("%s/errors", metricPrefix), cacheStats.getNumErrors()));
}
}
13 changes: 11 additions & 2 deletions client/src/main/java/com/metamx/druid/client/cache/CacheStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ public class CacheStats
private final long sizeInBytes;
private final long numEvictions;
private final long numTimeouts;
private final long numErrors;

public CacheStats(
long numHits,
long numMisses,
long size,
long sizeInBytes,
long numEvictions,
long numTimeouts
long numTimeouts,
long numErrors
)
{
this.numHits = numHits;
Expand All @@ -45,6 +47,7 @@ public CacheStats(
this.sizeInBytes = sizeInBytes;
this.numEvictions = numEvictions;
this.numTimeouts = numTimeouts;
this.numErrors = numErrors;
}

public long getNumHits()
Expand Down Expand Up @@ -77,6 +80,11 @@ public long getNumTimeouts()
return numTimeouts;
}

public long getNumErrors()
{
return numErrors;
}

public long numLookups()
{
return numHits + numMisses;
Expand Down Expand Up @@ -104,7 +112,8 @@ public CacheStats delta(CacheStats oldStats)
size - oldStats.size,
sizeInBytes - oldStats.sizeInBytes,
numEvictions - oldStats.numEvictions,
numTimeouts - oldStats.numTimeouts
numTimeouts - oldStats.numTimeouts,
numErrors - oldStats.numErrors
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public CacheStats getStats()
byteCountingLRUMap.size(),
byteCountingLRUMap.getNumBytes(),
byteCountingLRUMap.getEvictionCount(),
0,
0
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceEventBuilder;
import net.spy.memcached.AddrUtil;
import net.spy.memcached.ConnectionFactoryBuilder;
import net.spy.memcached.DefaultHashAlgorithm;
Expand All @@ -47,6 +50,8 @@

public class MemcachedCache implements Cache
{
private static final Logger log = new Logger(MemcachedCache.class);

public static MemcachedCache create(final MemcachedCacheConfig config)
{
try {
Expand All @@ -60,9 +65,11 @@ public static MemcachedCache create(final MemcachedCacheConfig config)
.setHashAlg(DefaultHashAlgorithm.FNV1A_64_HASH)
.setLocatorType(ConnectionFactoryBuilder.Locator.CONSISTENT)
.setDaemon(true)
.setFailureMode(FailureMode.Retry)
.setFailureMode(FailureMode.Cancel)
.setTranscoder(transcoder)
.setShouldOptimize(true)
.setOpQueueMaxBlockTime(config.getTimeout())
.setOpTimeout(config.getTimeout())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This technically means that at the limit, we could experience timeout * 2 delay, correct? I.e. it could wait on the OpQueue right until timeout, when it gets taken out and runs, only to wait until timeout on the actual operation. This is not necessarily a problem, just want to understand if these are additive or if the thing is intelligent about taking into account both waits.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, this would mean it could take up to 1 second for an operation to complete, given that it will wait up to 500ms to get the operation in the queue and then up to 500ms for it to complete. Before it would wait up to 10s to get a slot in the queue.

From reading the spymemcached source code, it appears opTimeout is only used when doing synchronous gets / sets. In our case we set that timeout explicitely on the asyncGet call. I'm just setting it globally here in case it happens to affect anything else I am not aware of.

.build(),
AddrUtil.getAddresses(config.getHosts())
),
Expand All @@ -84,6 +91,7 @@ public static MemcachedCache create(final MemcachedCacheConfig config)
private final AtomicLong hitCount = new AtomicLong(0);
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);

MemcachedCache(MemcachedClientIF client, String memcachedPrefix, int timeout, int expiration) {
Preconditions.checkArgument(memcachedPrefix.length() <= MAX_PREFIX_LENGTH,
Expand All @@ -105,14 +113,23 @@ public CacheStats getStats()
0,
0,
0,
timeoutCount.get()
timeoutCount.get(),
errorCount.get()
);
}

@Override
public byte[] get(NamedKey key)
{
Future<Object> future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
Future<Object> future;
try {
future = client.asyncGet(computeKeyHash(memcachedPrefix, key));
} catch(IllegalStateException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the chances that an IllegalStateException is thrown from computeKeyHash? Should we have some extra checks to make sure this is actually an issue with queueing, or are we reasonably confident that it is?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty confident computeKeyHash shouldn't throw that kind of exception. If anything it would throw a DigestException.

// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
return null;
}
try {
byte[] bytes = (byte[]) future.get(timeout, TimeUnit.MILLISECONDS);
if(bytes != null) {
Expand All @@ -133,14 +150,22 @@ public byte[] get(NamedKey key)
throw Throwables.propagate(e);
}
catch(ExecutionException e) {
throw Throwables.propagate(e);
errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache");
return null;
}
}

@Override
public void put(NamedKey key, byte[] value)
{
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
try {
client.set(computeKeyHash(memcachedPrefix, key), expiration, serializeValue(key, value));
} catch(IllegalStateException e) {
// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
}
}

private static byte[] serializeValue(NamedKey key, byte[] value) {
Expand Down Expand Up @@ -183,7 +208,17 @@ public String apply(
}
);

BulkFuture<Map<String, Object>> future = client.asyncGetBulk(keyLookup.keySet());
Map<NamedKey, byte[]> results = Maps.newHashMap();

BulkFuture<Map<String, Object>> future;
try {
future = client.asyncGetBulk(keyLookup.keySet());
} catch(IllegalStateException e) {
// operation did not get queued in time (queue is full)
errorCount.incrementAndGet();
log.warn(e, "Unable to queue cache operation");
return results;
}

try {
Map<String, Object> some = future.getSome(timeout, TimeUnit.MILLISECONDS);
Expand All @@ -195,7 +230,6 @@ public String apply(
missCount.addAndGet(keyLookup.size() - some.size());
hitCount.addAndGet(some.size());

Map<NamedKey, byte[]> results = Maps.newHashMap();
for(Map.Entry<String, Object> entry : some.entrySet()) {
final NamedKey key = keyLookup.get(entry.getKey());
final byte[] value = (byte[]) entry.getValue();
Expand All @@ -212,7 +246,9 @@ public String apply(
throw Throwables.propagate(e);
}
catch(ExecutionException e) {
throw Throwables.propagate(e);
errorCount.incrementAndGet();
log.warn(e, "Exception pulling item from cache");
return results;
}
}

Expand Down