diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index d2b8cd2bac645..ff6c56add84d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -441,46 +442,46 @@ private void closeOpenIterators() { } } - private class RocksDbIterator implements KeyValueIterator { + private class RocksDbIterator extends AbstractIterator> implements KeyValueIterator { private final String storeName; private final RocksIterator iter; private volatile boolean open = true; + private KeyValue next; + RocksDbIterator(final String storeName, final RocksIterator iter) { this.iter = iter; this.storeName = storeName; } - byte[] peekRawKey() { - return iter.key(); - } - - private KeyValue getKeyValue() { - return new KeyValue<>(new Bytes(iter.key()), iter.value()); - } - @Override public synchronized boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } - - return iter.isValid(); + return super.hasNext(); } - /** - * @throws NoSuchElementException if no next element exist - */ @Override public synchronized KeyValue next() { - if (!hasNext()) - throw new NoSuchElementException(); + return super.next(); + } - final KeyValue entry = this.getKeyValue(); - iter.next(); - return entry; + @Override + public KeyValue makeNext() { + if (!iter.isValid()) { + return allDone(); + } else { + next = this.getKeyValue(); + iter.next(); + return next; + } + } + + private KeyValue getKeyValue() { + return new KeyValue<>(new Bytes(iter.key()), iter.value()); } @Override @@ -500,7 +501,7 @@ public Bytes peekNextKey() { if (!hasNext()) { throw new NoSuchElementException(); } - return new Bytes(iter.key()); + return next.key; } } @@ -524,8 +525,17 @@ private class RocksDBRangeIterator extends RocksDbIterator { } @Override - public synchronized boolean hasNext() { - return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; + public KeyValue makeNext() { + final KeyValue next = super.makeNext(); + + if (next == null) { + return allDone(); + } else { + if (comparator.compare(next.key.get(), this.rawToKey) <= 0) + return next; + else + return allDone(); + } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 099cba17328c8..331ffdb3769bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -66,7 +66,7 @@ public Bytes peekNextKey() { @Override public boolean hasNext() { boolean hasNext = false; - while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen()) + while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen()) && segments.hasNext()) { close(); currentSegment = segments.next(); @@ -83,6 +83,16 @@ public boolean hasNext() { return currentIterator != null && hasNext; } + private boolean hasNextConditionHasNext() { + boolean hasNext = false; + try { + hasNext = hasNextCondition.hasNext(currentIterator); + } catch (InvalidStateStoreException e) { + //already closed so ignore + } + return hasNext; + } + public KeyValue next() { if (!hasNext()) { throw new NoSuchElementException(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index be4ede8ae9149..c436e9e588f1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -61,7 +60,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @SuppressWarnings("PointlessArithmeticExpression") public class RocksDBWindowStoreTest { @@ -747,7 +745,7 @@ public void testInitialLoading() { } @Test - public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() { + public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() { windowStore = createWindowStore(context); context.setRecordContext(createRecordContext(0)); windowStore.put(1, "one", 1L); @@ -757,20 +755,9 @@ public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreEx final WindowStoreIterator iterator = windowStore.fetch(1, 1L, 3L); assertTrue(iterator.hasNext()); windowStore.close(); - try { - //noinspection ResultOfMethodCallIgnored - iterator.hasNext(); - fail("should have thrown InvalidStateStoreException on closed store"); - } catch (final InvalidStateStoreException e) { - // ok - } - try { - iterator.next(); - fail("should have thrown InvalidStateStoreException on closed store"); - } catch (final InvalidStateStoreException e) { - // ok - } + assertFalse(iterator.hasNext()); + } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 7a7b266537b4f..80459641f0f89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.NoSuchElementException; import static org.junit.Assert.assertEquals; @@ -104,6 +105,19 @@ public void shouldIterateOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { + iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(), + hasNextCondition, + Bytes.wrap("a".getBytes()), + Bytes.wrap("z".getBytes())); + + + iterator.currentIterator = segmentOne.all(); + segmentOne.close(); + assertFalse(iterator.hasNext()); + } + @Test public void shouldOnlyIterateOverSegmentsInRange() { iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),