Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ static int memoryCacheEntrySize(final byte[] key, final byte[] value, final Stri
}

@Test
public void evict() throws IOException {
public void evict() {
final List<KeyValue<String, String>> received = new ArrayList<>();
final List<KeyValue<String, String>> expected = Collections.singletonList(
new KeyValue<>("K1", "V1"));
Expand All @@ -161,14 +161,10 @@ public void evict() throws IOException {
final ThreadCache cache = new ThreadCache(logContext,
memoryCacheEntrySize(kv.key.getBytes(), kv.value.getBytes(), ""),
new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
}
cache.addDirtyEntryFlushListener(namespace, dirty -> {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(new KeyValue<>(dirtyEntry.key().toString(), new String(dirtyEntry.newValue())));
}

});

for (final KeyValue<String, String> kvToInsert : toInsert) {
Expand Down Expand Up @@ -200,12 +196,7 @@ public void shouldNotFlushAfterDelete() {
final Bytes key = Bytes.wrap(new byte[]{0});
final ThreadCache cache = new ThreadCache(logContext, 10000L, new MockStreamsMetrics(new Metrics()));
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, key, dirtyEntry(key.get()));
assertEquals(key.get(), cache.delete(namespace, key).value());

Expand Down Expand Up @@ -298,12 +289,8 @@ public void shouldPeekAndIterateOverRange() {
public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
final int entrySize = memoryCacheEntrySize(new byte[1], new byte[1], "");
final ThreadCache cache = new ThreadCache(logContext, entrySize * 5, new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
cache.addDirtyEntryFlushListener(namespace, dirty -> { });

}
});
final byte[][] bytes = {{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}};
for (int i = 0; i < 5; i++) {
cache.put(namespace, Bytes.wrap(bytes[i]), dirtyEntry(bytes[i]));
Expand All @@ -322,12 +309,9 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
public void shouldFlushDirtyEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
final List<byte[]> received = new ArrayList<>();
cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(dirtyEntry.key().get());
}
cache.addDirtyEntryFlushListener(namespace1, dirty -> {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(dirtyEntry.key().get());
}
});
final List<byte[]> expected = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
Expand All @@ -344,12 +328,9 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
public void shouldNotFlushCleanEntriesForNamespace() {
final ThreadCache cache = new ThreadCache(logContext, 100000, new MockStreamsMetrics(new Metrics()));
final List<byte[]> received = new ArrayList<>();
cache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(dirtyEntry.key().get());
}
cache.addDirtyEntryFlushListener(namespace1, dirty -> {
for (final ThreadCache.DirtyEntry dirtyEntry : dirty) {
received.add(dirtyEntry.key().get());
}
});
final List<byte[]> toInsert = Arrays.asList(new byte[]{0}, new byte[]{1}, new byte[]{2});
Expand All @@ -366,12 +347,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) {
private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(final ThreadCache cache) {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();

cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{0}));
assertEquals(1, received.size());

Expand All @@ -396,12 +372,7 @@ public void shouldEvictImmediatelyIfCacheSizeIsZero() {
public void shouldEvictAfterPutAll() {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.addDirtyEntryFlushListener(namespace, received::addAll);

cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})),
KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}))));
Expand All @@ -425,12 +396,7 @@ public void shouldPutAll() {
public void shouldNotForwardCleanEntryOnEviction() {
final ThreadCache cache = new ThreadCache(logContext, 0, new MockStreamsMetrics(new Metrics()));
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.addDirtyEntryFlushListener(namespace, received::addAll);
cache.put(namespace, Bytes.wrap(new byte[]{1}), cleanEntry(new byte[]{0}));
assertEquals(0, received.size());
}
Expand All @@ -448,12 +414,7 @@ public void shouldPutIfAbsent() {
public void shouldEvictAfterPutIfAbsent() {
final List<ThreadCache.DirtyEntry> received = new ArrayList<>();
final ThreadCache cache = new ThreadCache(logContext, 1, new MockStreamsMetrics(new Metrics()));
cache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
received.addAll(dirty);
}
});
cache.addDirtyEntryFlushListener(namespace, received::addAll);

cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5}));
cache.putIfAbsent(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6}));
Expand All @@ -468,26 +429,13 @@ public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() {
final int maxCacheSizeInBytes = 100;
final ThreadCache threadCache = new ThreadCache(logContext, maxCacheSizeInBytes, new MockStreamsMetrics(new Metrics()));
// trigger a put into another cache on eviction from "name"
threadCache.addDirtyEntryFlushListener(namespace, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
// put an item into an empty cache when the total cache size
// is already > than maxCacheSizeBytes
threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
}
});
threadCache.addDirtyEntryFlushListener(namespace1, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {
//
}
});
threadCache.addDirtyEntryFlushListener(namespace2, new ThreadCache.DirtyEntryFlushListener() {
@Override
public void apply(final List<ThreadCache.DirtyEntry> dirty) {

}
threadCache.addDirtyEntryFlushListener(namespace, dirty -> {
// put an item into an empty cache when the total cache size
// is already > than maxCacheSizeBytes
threadCache.put(namespace1, Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[2]));
});
threadCache.addDirtyEntryFlushListener(namespace1, dirty -> { });
threadCache.addDirtyEntryFlushListener(namespace2, dirty -> { });

threadCache.put(namespace2, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
threadCache.put(namespace, Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[1]));
Expand Down