diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index 34c35b7c2fc2f..1ee6928e98f26 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -81,19 +81,17 @@ public void process(String dummy, String line) { @Override public void punctuate(long timestamp) { - KeyValueIterator iter = this.kvStore.all(); + try (KeyValueIterator iter = this.kvStore.all()) { + System.out.println("----------- " + timestamp + " ----------- "); - System.out.println("----------- " + timestamp + " ----------- "); + while (iter.hasNext()) { + KeyValue entry = iter.next(); - while (iter.hasNext()) { - KeyValue entry = iter.next(); + System.out.println("[" + entry.key + ", " + entry.value + "]"); - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); + context.forward(entry.key, entry.value.toString()); + } } - - iter.close(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index d13d11208d913..72029a8c24092 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.processor.AbstractProcessor; @@ -25,8 +24,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; class KStreamKStreamJoin implements ProcessorSupplier { @@ -76,15 +75,15 @@ public void process(K key, V1 value) { long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); - Iterator> iter = otherWindow.fetch(key, timeFrom, timeTo); - while (iter.hasNext()) { - needOuterJoin = false; - context().forward(key, joiner.apply(value, iter.next().value)); - } + try (WindowStoreIterator iter = otherWindow.fetch(key, timeFrom, timeTo)) { + while (iter.hasNext()) { + needOuterJoin = false; + context().forward(key, joiner.apply(value, iter.next().value)); + } - if (needOuterJoin) - context().forward(key, joiner.apply(value, null)); + if (needOuterJoin) + context().forward(key, joiner.apply(value, null)); + } } } - } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index b4272f89a827a..125c7fcc25d04 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowAggregate implements KStreamAggProcessorSupplier, V, T> { @@ -90,38 +89,37 @@ public void process(K key, V value) { timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) { - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); - if (window != null) { + if (window != null) { - T oldAgg = entry.value; + T oldAgg = entry.value; - if (oldAgg == null) - oldAgg = initializer.apply(); + if (oldAgg == null) + oldAgg = initializer.apply(); - // try to add the new new value (there will never be old value) - T newAgg = aggregator.apply(key, value, oldAgg); + // try to add the new new value (there will never be old value) + T newAgg = aggregator.apply(key, value, oldAgg); - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { T oldAgg = initializer.apply(); @@ -167,10 +165,9 @@ public T get(Windowed windowedKey) { W window = (W) windowedKey.window(); // this iterator should contain at most one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.hasNext() ? iter.next().value : null; + try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.hasNext() ? iter.next().value : null; + } } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java index 3ed1499f658a8..a526506c17931 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java @@ -28,7 +28,6 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; -import java.util.Iterator; import java.util.Map; public class KStreamWindowReduce implements KStreamAggProcessorSupplier, V, V> { @@ -88,40 +87,38 @@ public void process(K key, V value) { timeTo = windowStartMs > timeTo ? windowStartMs : timeTo; } - WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo); + try (WindowStoreIterator iter = windowStore.fetch(key, timeFrom, timeTo)) { + // for each matching window, try to update the corresponding key and send to the downstream + while (iter.hasNext()) { + KeyValue entry = iter.next(); + W window = matchedWindows.get(entry.key); - // for each matching window, try to update the corresponding key and send to the downstream - while (iter.hasNext()) { - KeyValue entry = iter.next(); - W window = matchedWindows.get(entry.key); + if (window != null) { - if (window != null) { + V oldAgg = entry.value; + V newAgg = oldAgg; - V oldAgg = entry.value; - V newAgg = oldAgg; + // try to add the new new value (there will never be old value) + if (newAgg == null) { + newAgg = value; + } else { + newAgg = reducer.apply(newAgg, value); + } - // try to add the new new value (there will never be old value) - if (newAgg == null) { - newAgg = value; - } else { - newAgg = reducer.apply(newAgg, value); - } - - // update the store with the new value - windowStore.put(key, newAgg, window.start()); + // update the store with the new value + windowStore.put(key, newAgg, window.start()); - // forward the aggregated change pair - if (sendOldValues) - context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); - else - context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); + // forward the aggregated change pair + if (sendOldValues) + context().forward(new Windowed<>(key, window), new Change<>(newAgg, oldAgg)); + else + context().forward(new Windowed<>(key, window), new Change<>(newAgg, null)); - matchedWindows.remove(entry.key); + matchedWindows.remove(entry.key); + } } } - iter.close(); - // create the new window for the rest of unmatched window that do not exist yet for (long windowStartMs : matchedWindows.keySet()) { windowStore.put(key, value, windowStartMs); @@ -161,10 +158,9 @@ public V get(Windowed windowedKey) { W window = (W) windowedKey.window(); // this iterator should only contain one element - Iterator> iter = windowStore.fetch(key, window.start(), window.start()); - - return iter.next().value; + try (WindowStoreIterator iter = windowStore.fetch(key, window.start(), window.start())) { + return iter.next().value; + } } - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java index cdb3de5f90a2d..ddbc7b333b6b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java @@ -27,6 +27,9 @@ /** * Iterator interface of {@link KeyValue}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * * @param Type of keys * @param Type of values */ diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java index 7c474dd60bf8a..b6e6d0c2df381 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStoreIterator.java @@ -21,13 +21,19 @@ import org.apache.kafka.streams.KeyValue; +import java.io.Closeable; import java.util.Iterator; /** * Iterator interface of {@link KeyValue} with key typed {@link Long} used for {@link WindowStore#fetch(Object, long, long)}. * + * Users need to call its {@code close} method explicitly upon completeness to release resources, + * or use try-with-resources statement (available since JDK7) for this {@link Closeable} class. + * * @param Type of values */ -public interface WindowStoreIterator extends Iterator> { +public interface WindowStoreIterator extends Iterator>, Closeable { + + @Override void close(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 37609a0d28b3a..a00de19926fc6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -77,17 +77,18 @@ public class RocksDBStore implements KeyValueStore { private final String name; private final String parentDir; - private final Options options; - private final WriteOptions wOptions; - private final FlushOptions fOptions; - + protected File dbDir; + private StateSerdes serdes; private final Serde keySerde; private final Serde valueSerde; - private StateSerdes serdes; - protected File dbDir; private RocksDB db; + // the following option objects will be created at constructor and disposed at close() + private Options options; + private WriteOptions wOptions; + private FlushOptions fOptions; + private boolean loggingEnabled = false; private int cacheSize = DEFAULT_UNENCODED_CACHE_SIZE; @@ -313,14 +314,16 @@ public void putAll(List> entries) { private void putAllInternal(List> entries) { WriteBatch batch = new WriteBatch(); - for (KeyValue entry : entries) { - batch.put(entry.key, entry.value); - } - try { + for (KeyValue entry : entries) { + batch.put(entry.key, entry.value); + } + db.write(wOptions, batch); } catch (RocksDBException e) { throw new ProcessorStateException("Error while batch writing to store " + this.name, e); + } finally { + batch.dispose(); } } @@ -425,7 +428,15 @@ public void flushInternal() { @Override public void close() { flush(); + options.dispose(); + wOptions.dispose(); + fOptions.dispose(); db.close(); + + options = null; + wOptions = null; + fOptions = null; + db = null; } private static class RocksDbIterator implements KeyValueIterator { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 1095fcf513ff2..62b283aefd944 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -351,9 +351,11 @@ public void process(String key, String value) { @Override public void punctuate(long streamTime) { int count = 0; - for (KeyValueIterator iter = store.all(); iter.hasNext();) { - iter.next(); - ++count; + try (KeyValueIterator iter = store.all()) { + while (iter.hasNext()) { + iter.next(); + ++count; + } } context().forward(Long.toString(streamTime), count); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 3a35d7542fcee..be5596d053626 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -362,9 +362,11 @@ public int checkForRestoredEntries(KeyValueStore store) { */ public int sizeOf(KeyValueStore store) { int size = 0; - for (KeyValueIterator iterator = store.all(); iterator.hasNext();) { - iterator.next(); - ++size; + try (KeyValueIterator iterator = store.all()) { + while (iterator.hasNext()) { + iterator.next(); + ++size; + } } return size; } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index e9888ada6be6c..d889e7b323429 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -785,9 +785,10 @@ public void send(ProducerRecord record, Serializer keySeria segmentDirs(baseDir) ); - WindowStoreIterator iter = store.fetch(0, 0L, 1000000L); - while (iter.hasNext()) { - iter.next(); + try (WindowStoreIterator iter = store.fetch(0, 0L, 1000000L)) { + while (iter.hasNext()) { + iter.next(); + } } assertEquals(