From c9efd08d5c564a945e5108980e560e14c61c6562 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Thu, 22 Mar 2018 18:19:12 -0400 Subject: [PATCH 1/7] KAFKA-6704: hasNext should throw InvalidStateStoreException from hasNextCondition.hasNext --- .../state/internals/SegmentIterator.java | 12 +++++- .../state/internals/SegmentIteratorTest.java | 41 +++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java index 099cba17328c8..331ffdb3769bf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentIterator.java @@ -66,7 +66,7 @@ public Bytes peekNextKey() { @Override public boolean hasNext() { boolean hasNext = false; - while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen()) + while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen()) && segments.hasNext()) { close(); currentSegment = segments.next(); @@ -83,6 +83,16 @@ public boolean hasNext() { return currentIterator != null && hasNext; } + private boolean hasNextConditionHasNext() { + boolean hasNext = false; + try { + hasNext = hasNextCondition.hasNext(currentIterator); + } catch (InvalidStateStoreException e) { + //already closed so ignore + } + return hasNext; + } + public KeyValue next() { if (!hasNext()) { throw new NoSuchElementException(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 7a7b266537b4f..8187f446c6d97 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.InternalMockProcessorContext; @@ -104,6 +105,46 @@ public void shouldIterateOverAllSegments() { assertFalse(iterator.hasNext()); } + @Test + public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { + iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(), + hasNextCondition, + Bytes.wrap("a".getBytes()), + Bytes.wrap("z".getBytes())); + segmentTwo.close(); + segmentOne.close(); + + iterator.currentIterator = new KeyValueIterator() { + @Override + public void close() { + + } + + @Override + public Bytes peekNextKey() { + return null; + } + + @Override + public boolean hasNext() { + throw new InvalidStateStoreException("Store is closed"); + } + + @Override + public KeyValue next() { + return null; + } + + @Override + public void remove() { + + } + }; + + + assertFalse(iterator.hasNext()); + } + @Test public void shouldOnlyIterateOverSegmentsInRange() { iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(), From d97ef624c85ecbf6f6dfbf124ebafadc6391970a Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Fri, 30 Mar 2018 18:03:57 -0400 Subject: [PATCH 2/7] KAFKA-6704: hasNext could throw when IQ iterates over store closed by StreamThread --- .../internals/RocksDBWindowStoreTest.java | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index be4ede8ae9149..87b1afeb1b633 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; @@ -747,7 +746,7 @@ public void testInitialLoading() { } @Test - public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() { + public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() { windowStore = createWindowStore(context); context.setRecordContext(createRecordContext(0)); windowStore.put(1, "one", 1L); @@ -757,20 +756,9 @@ public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreEx final WindowStoreIterator iterator = windowStore.fetch(1, 1L, 3L); assertTrue(iterator.hasNext()); windowStore.close(); - try { - //noinspection ResultOfMethodCallIgnored - iterator.hasNext(); - fail("should have thrown InvalidStateStoreException on closed store"); - } catch (final InvalidStateStoreException e) { - // ok - } - try { - iterator.next(); - fail("should have thrown InvalidStateStoreException on closed store"); - } catch (final InvalidStateStoreException e) { - // ok - } + assertFalse(iterator.hasNext()); + } @Test From f078efe63f84b28e6e47307897ec7acde88a4257 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Wed, 4 Apr 2018 17:06:09 -0400 Subject: [PATCH 3/7] updates per comments --- .../state/internals/SegmentIteratorTest.java | 35 +++---------------- 1 file changed, 4 insertions(+), 31 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java index 8187f446c6d97..80459641f0f89 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/SegmentIteratorTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.test.InternalMockProcessorContext; @@ -32,6 +31,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.NoSuchElementException; import static org.junit.Assert.assertEquals; @@ -107,41 +107,14 @@ public void shouldIterateOverAllSegments() { @Test public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() { - iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(), + iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(), hasNextCondition, Bytes.wrap("a".getBytes()), Bytes.wrap("z".getBytes())); - segmentTwo.close(); - segmentOne.close(); - - iterator.currentIterator = new KeyValueIterator() { - @Override - public void close() { - - } - - @Override - public Bytes peekNextKey() { - return null; - } - - @Override - public boolean hasNext() { - throw new InvalidStateStoreException("Store is closed"); - } - - @Override - public KeyValue next() { - return null; - } - - @Override - public void remove() { - - } - }; + iterator.currentIterator = segmentOne.all(); + segmentOne.close(); assertFalse(iterator.hasNext()); } From 111ceb3e5d30c5e7aee9f0815263fcb55bbcd978 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 4 Jun 2018 10:57:34 -0400 Subject: [PATCH 4/7] fixed checkstyle issue --- .../kafka/streams/state/internals/RocksDBWindowStoreTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 87b1afeb1b633..c436e9e588f1c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -60,7 +60,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; @SuppressWarnings("PointlessArithmeticExpression") public class RocksDBWindowStoreTest { From 16e2a573dd76a158139572eb82cbbc203a2dc2b0 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Mon, 4 Jun 2018 22:10:30 -0400 Subject: [PATCH 5/7] Implement AbstractIterator --- .../streams/state/internals/RocksDBStore.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index d2b8cd2bac645..08d677d7ec260 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.AbstractIterator; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KeyValue; @@ -441,46 +442,37 @@ private void closeOpenIterators() { } } - private class RocksDbIterator implements KeyValueIterator { + private class RocksDbIterator extends AbstractIterator> implements KeyValueIterator { private final String storeName; private final RocksIterator iter; private volatile boolean open = true; + private KeyValue next; + RocksDbIterator(final String storeName, final RocksIterator iter) { this.iter = iter; this.storeName = storeName; } - byte[] peekRawKey() { - return iter.key(); - } - - private KeyValue getKeyValue() { - return new KeyValue<>(new Bytes(iter.key()), iter.value()); - } - @Override - public synchronized boolean hasNext() { + public KeyValue makeNext() { if (!open) { throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } - return iter.isValid(); + if (!iter.isValid()) { + return allDone(); + } else { + next = this.getKeyValue(); + iter.next(); + return next; + } } - /** - * @throws NoSuchElementException if no next element exist - */ - @Override - public synchronized KeyValue next() { - if (!hasNext()) - throw new NoSuchElementException(); - - final KeyValue entry = this.getKeyValue(); - iter.next(); - return entry; + private KeyValue getKeyValue() { + return new KeyValue<>(new Bytes(iter.key()), iter.value()); } @Override @@ -500,7 +492,7 @@ public Bytes peekNextKey() { if (!hasNext()) { throw new NoSuchElementException(); } - return new Bytes(iter.key()); + return next.key; } } @@ -524,8 +516,17 @@ private class RocksDBRangeIterator extends RocksDbIterator { } @Override - public synchronized boolean hasNext() { - return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0; + public KeyValue makeNext() { + final KeyValue next = super.makeNext(); + + if (next == null) { + return allDone(); + } else { + if (comparator.compare(next.key.get(), this.rawToKey) <= 0) + return next; + else + return allDone(); + } } } From 9e8d65c74c016c26e7cc13deeba584e48f541efe Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Tue, 5 Jun 2018 09:52:23 -0400 Subject: [PATCH 6/7] Added check for if the store is closed on hasNext --- .../apache/kafka/streams/state/internals/RocksDBStore.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 08d677d7ec260..54db9290f8e91 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -457,11 +457,15 @@ private class RocksDbIterator extends AbstractIterator> } @Override - public KeyValue makeNext() { + public boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } + return super.hasNext(); + } + @Override + public KeyValue makeNext() { if (!iter.isValid()) { return allDone(); } else { From 01be095ec6d013ca54bd0774f915922b0911dff2 Mon Sep 17 00:00:00 2001 From: Bill Bejeck Date: Tue, 5 Jun 2018 10:13:01 -0400 Subject: [PATCH 7/7] added synchronized to hasNext and next --- .../apache/kafka/streams/state/internals/RocksDBStore.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 54db9290f8e91..ff6c56add84d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -457,13 +457,18 @@ private class RocksDbIterator extends AbstractIterator> } @Override - public boolean hasNext() { + public synchronized boolean hasNext() { if (!open) { throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName)); } return super.hasNext(); } + @Override + public synchronized KeyValue next() { + return super.next(); + } + @Override public KeyValue makeNext() { if (!iter.isValid()) {