From e561a48c4dfa3057b60a55ce1540382d4324dcbe Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Thu, 19 May 2016 20:17:15 -0700 Subject: [PATCH 1/3] dispose all rocksobejcts --- .../wordcount/WordCountProcessorDemo.java | 16 +++--- .../kstream/internals/KStreamKStreamJoin.java | 19 +++---- .../internals/KStreamWindowAggregate.java | 49 ++++++++-------- .../internals/KStreamWindowReduce.java | 56 +++++++++---------- .../kafka/streams/state/KeyValueIterator.java | 2 + .../streams/state/WindowStoreIterator.java | 5 +- .../streams/state/internals/RocksDBStore.java | 13 +++-- .../internals/ProcessorTopologyTest.java | 8 ++- .../state/KeyValueStoreTestDriver.java | 8 ++- .../internals/RocksDBWindowStoreTest.java | 7 ++- 10 files changed, 94 insertions(+), 89 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 34c35b7c2fc2f..1ee6928e98f26 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -81,19 +81,17 @@ public void process(String dummy, String line) { @Override public void punctuate(long timestamp) { - KeyValueIterator iter = this.kvStore.all(); + try (KeyValueIterator iter = this.kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); - System.out.println("----------- " + timestamp + " ----------- "); + while (iter.hasNext()) { + KeyValue entry = iter.next(); - while (iter.hasNext()) { - KeyValue entry = iter.next(); + System.out.println("[" + entry.key + ", " + entry.value + "]"); - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); + context.forward(entry.key, entry.value.toString()); + } } - - iter.close(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index d13d11208d913..72029a8c24092 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -25,8 +24,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; class KStreamKStreamJoin implements ProcessorSupplier { @@ -76,15 +75,15 @@ public void process(K key, V1 value) { long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); - Iterator> iter = otherWindow.fetch(key, timeFrom, timeTo); - while (iter.hasNext()) { - needOuterJoin = false; - context().forward(key, joiner.apply(value, iter.next().value)); - } + try (WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { + while (iter.hasNext()) { + needOuterJoin = false; + context().forward(key, joiner.apply(value, iter.next().value)); + } - if (needOuterJoin) - context().forward(key, joiner.apply(value, null)); + if (needOuterJoin) + context().forward(key, joiner.apply(value, null)); + } } } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index b4272f89a827a..125c7fcc25d04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowAggregate implements KStreamAggProcessorSupplier, V, T> { @@ -90,38 +89,37 @@ public void process(K key, V value) { timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) { - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); - if (window != null) { + if (window != null) { - T oldAgg = entry.value; + T oldAgg = entry.value; - if (oldAgg == null) - oldAgg = initializer.apply(); + if (oldAgg == null) + oldAgg = initializer.apply(); - // try to add the new new value (there will never be old value) - T newAgg = aggregator.apply(key, value, oldAgg); + // try to add the new new value (there will never be old value) + T newAgg = aggregator.apply(key, value, oldAgg); - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { T oldAgg = initializer.apply(); @@ -167,10 +165,9 @@ public T get(Windowed windowedKey) { W window = (W) windowedKey.window(); // this iterator should contain at most one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.hasNext() ? iter.next().value : null; + try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.hasNext() ? iter.next().value : null; + } } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 3ed1499f658a8..a526506c17931 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowReduce implements KStreamAggProcessorSupplier, V, V> { @@ -88,40 +87,38 @@ public void process(K key, V value) { timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) { + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); + if (window != null) { - if (window != null) { + V oldAgg = entry.value; + V newAgg = oldAgg; - V oldAgg = entry.value; - V newAgg = oldAgg; + // try to add the new new value (there will never be old value) + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); + } - // try to add the new new value (there will never be old value) - if (newAgg == null) { - newAgg = value; - } else { - newAgg = reducer.apply(newAgg, value); - } - - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { windowStore.put(key, value, windowStartMs); @@ -161,10 +158,9 @@ public V get(Windowed windowedKey) { W window = (W) windowedKey.window(); // this iterator should only contain one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.next().value; + try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.next().value; + } } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index cdb3de5f90a2d..ea997363cb01e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -27,6 +27,8 @@ /** * Iterator interface of {@link KeyValue}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources. + * * @param Type of keys * @param Type of values */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 7c474dd60bf8a..a619f4cc325fb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.KeyValue; +import java.io.Closeable; import java.util.Iterator; /** @@ -28,6 +29,8 @@ * * @param Type of values */ -public interface WindowStoreIterator extends Iterator> { +public interface WindowStoreIterator extends Iterator>, Closeable { + + @Override void close(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 37609a0d28b3a..c7fd0765e7727 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -313,14 +313,16 @@ public void putAll(List> entries) { private void putAllInternal(List> entries) { WriteBatch batch = new WriteBatch(); - for (KeyValue entry : entries) { - batch.put(entry.key, entry.value); - } - try { + for (KeyValue entry : entries) { + batch.put(entry.key, entry.value); + } + db.write(wOptions, batch); } catch (RocksDBException e) { throw new ProcessorStateException("Error while batch writing to store " + this.name, e); + } finally { + batch.dispose(); } } @@ -425,6 +427,9 @@ public void flushInternal() { @Override public void close() { flush(); + options.dispose(); + wOptions.dispose(); + fOptions.dispose(); db.close(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1095fcf513ff2..62b283aefd944 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -351,9 +351,11 @@ public void process(String key, String value) { @Override public void punctuate(long streamTime) { int count = 0; - for (KeyValueIterator iter = store.all(); iter.hasNext();) { - iter.next(); - ++count; + try (KeyValueIterator iter = store.all()) { + while (iter.hasNext()) { + iter.next(); + ++count; + } } context().forward(Long.toString(streamTime), count); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 3a35d7542fcee..be5596d053626 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -362,9 +362,11 @@ public int checkForRestoredEntries(KeyValueStore store) { */ public int sizeOf(KeyValueStore store) { int size = 0; - for (KeyValueIterator iterator = store.all(); iterator.hasNext();) { - iterator.next(); - ++size; + try (KeyValueIterator iterator = store.all()) { + while (iterator.hasNext()) { + iterator.next(); + ++size; + } } return size; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index e9888ada6be6c..d889e7b323429 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -785,9 +785,10 @@ public void send(ProducerRecord record, Serializer keySeria segmentDirs(baseDir) ); - WindowStoreIterator iter = store.fetch(0, 0L, 1000000L); - while (iter.hasNext()) { - iter.next(); + try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) { + while (iter.hasNext()) { + iter.next(); + } } assertEquals( From 20a38a99220c14824b998f585856ed5086016c88 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 20 May 2016 08:31:54 -0700 Subject: [PATCH 2/3] github comments --- .../java/org/apache/kafka/streams/state/KeyValueIterator.java | 3 ++- .../org/apache/kafka/streams/state/WindowStoreIterator.java | 3 +++ .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 1 + 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index ea997363cb01e..ddbc7b333b6b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -27,7 +27,8 @@ /** * Iterator interface of {@link KeyValue}. * - * Users need to call its {@code close} method explicitly upon completeness to release resources. + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. * * @param Type of keys * @param Type of values diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index a619f4cc325fb..b6e6d0c2df381 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -27,6 +27,9 @@ /** * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * * @param Type of values */ public interface WindowStoreIterator extends Iterator>, Closeable { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index c7fd0765e7727..669837da82e2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -431,6 +431,7 @@ public void close() { wOptions.dispose(); fOptions.dispose(); db.close(); + db = null; } private static class RocksDbIterator implements KeyValueIterator { From 62fdbb401e903eb7156d7512c201a5f09ab21356 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 20 May 2016 09:17:04 -0700 Subject: [PATCH 3/3] github comments --- .../streams/state/internals/RocksDBStore.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 669837da82e2f..a00de19926fc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -77,17 +77,18 @@ public class RocksDBStore implements KeyValueStore { private final String name; private final String parentDir; - private final Options options; - private final WriteOptions wOptions; - private final FlushOptions fOptions; - + protected File dbDir; + private StateSerdes serdes; private final Serde keySerde; private final Serde valueSerde; - private StateSerdes serdes; - protected File dbDir; private RocksDB db; + // the following option objects will be created at constructor and disposed at close() + private Options options; + private WriteOptions wOptions; + private FlushOptions fOptions; + private boolean loggingEnabled = false; private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE; @@ -431,6 +432,10 @@ public void close() { wOptions.dispose(); fOptions.dispose(); db.close(); + + options = null; + wOptions = null; + fOptions = null; db = null; }