Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
Expand Down Expand Up @@ -441,46 +442,46 @@ private void closeOpenIterators() {
}
}

private class RocksDbIterator implements KeyValueIterator<Bytes, byte[]> {
private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]> {
private final String storeName;
private final RocksIterator iter;

private volatile boolean open = true;

private KeyValue<Bytes, byte[]> next;

RocksDbIterator(final String storeName,
final RocksIterator iter) {
this.iter = iter;
this.storeName = storeName;
}

byte[] peekRawKey() {
return iter.key();
}

private KeyValue<Bytes, byte[]> getKeyValue() {
return new KeyValue<>(new Bytes(iter.key()), iter.value());
}

@Override
public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this check here has got me to thinking more about this issue.

Without this guard condition, we have some failing unit tests.

In both the RocksDBIterator and the AbstractIterator all calls to next() make a call to hasNext() first before returning the next object. I'm not sure about changing the semantics where we return from next() without calling hasNext() first (which if we end up keeping those semantics, leaves us in the same position as before extending AbstractIterator).

I guess the question is, do we want to continue to throw an exception when hasNext() is called (when the store is closed) or simply return false?

I could be overthinking this, but I'm not entirely comfortable with returning a value from next() after closing the store. I feel like that creates more corner cases for potential errors or unexpected behavior.

WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck That is a good question!

Originally I thought it is okay to always calling hasNext inside next(), as long as we make sure hasNext implementation is idempotent, i.e. calling it multiple times before next() does not have side effect is sufficient. But by making it idempotent we could have the corner case you mentioned. For example:

t0: call `hasNext()` -> store is still open -> call `makeNext` -> `next` field is set.
t1: store is closed.
t2: call `next()` -> call `hasNext()` again

Without this check, at t3 we would still return the next field.


return iter.isValid();
return super.hasNext();
}

/**
* @throws NoSuchElementException if no next element exist
*/
@Override
public synchronized KeyValue<Bytes, byte[]> next() {
if (!hasNext())
throw new NoSuchElementException();
return super.next();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if required but this method was synchronized in the first place so I've kept it that way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

}

final KeyValue<Bytes, byte[]> entry = this.getKeyValue();
iter.next();
return entry;
@Override
public KeyValue<Bytes, byte[]> makeNext() {
if (!iter.isValid()) {
return allDone();
} else {
next = this.getKeyValue();
iter.next();
return next;
}
}

private KeyValue<Bytes, byte[]> getKeyValue() {
return new KeyValue<>(new Bytes(iter.key()), iter.value());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit (and paranoid) comment: maybe we can reuse the same KeyValue object, but just set its key / value fields since they are public and not final. So we do not create those short-lived objects for young gen GC. Not sure how much it will really get us, but just want to be safer since it is part of a critical code path (i.e. one object per each iterated element).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With another look, KeyValue is immutable key and value fields are final. We could extend KeyValue as an inner class of RocksDBStore to accomplish this. WDYT?

NM that won't work.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Do not bother then :) At lease we are not introduce a regression to make perf worse :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am late thus just a meta comment: we hand the KeyValue object to the user and user might actually keep a reference. Thus, we cannot reuse an object anyway, because we might mess up user code if they access an earlier return KeyValue again, after they retrieved newer ones.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's an excellent point.

}

@Override
Expand All @@ -500,7 +501,7 @@ public Bytes peekNextKey() {
if (!hasNext()) {
throw new NoSuchElementException();
}
return new Bytes(iter.key());
return next.key;
}
}

Expand All @@ -524,8 +525,17 @@ private class RocksDBRangeIterator extends RocksDbIterator {
}

@Override
public synchronized boolean hasNext() {
return super.hasNext() && comparator.compare(super.peekRawKey(), this.rawToKey) <= 0;
public KeyValue<Bytes, byte[]> makeNext() {
final KeyValue<Bytes, byte[]> next = super.makeNext();

if (next == null) {
return allDone();
} else {
if (comparator.compare(next.key.get(), this.rawToKey) <= 0)
return next;
else
return allDone();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Bytes peekNextKey() {
@Override
public boolean hasNext() {
boolean hasNext = false;
while ((currentIterator == null || !(hasNext = hasNextCondition.hasNext(currentIterator)) || !currentSegment.isOpen())
while ((currentIterator == null || !(hasNext = hasNextConditionHasNext()) || !currentSegment.isOpen())
&& segments.hasNext()) {
close();
currentSegment = segments.next();
Expand All @@ -83,6 +83,16 @@ public boolean hasNext() {
return currentIterator != null && hasNext;
}

private boolean hasNextConditionHasNext() {
boolean hasNext = false;
try {
hasNext = hasNextCondition.hasNext(currentIterator);
} catch (InvalidStateStoreException e) {
//already closed so ignore
}
return hasNext;
}

public KeyValue<Bytes, byte[]> next() {
if (!hasNext()) {
throw new NoSuchElementException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
Expand Down Expand Up @@ -61,7 +60,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@SuppressWarnings("PointlessArithmeticExpression")
public class RocksDBWindowStoreTest {
Expand Down Expand Up @@ -747,7 +745,7 @@ public void testInitialLoading() {
}

@Test
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreExceptionOnHasNextAndNext() {
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
windowStore = createWindowStore(context);
context.setRecordContext(createRecordContext(0));
windowStore.put(1, "one", 1L);
Expand All @@ -757,20 +755,9 @@ public void shouldCloseOpenIteratorsWhenStoreIsClosedAndThrowInvalidStateStoreEx
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 1L, 3L);
assertTrue(iterator.hasNext());
windowStore.close();
try {
//noinspection ResultOfMethodCallIgnored
iterator.hasNext();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}

try {
iterator.next();
fail("should have thrown InvalidStateStoreException on closed store");
} catch (final InvalidStateStoreException e) {
// ok
}
assertFalse(iterator.hasNext());

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.NoSuchElementException;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -104,6 +105,19 @@ public void shouldIterateOverAllSegments() {
assertFalse(iterator.hasNext());
}

@Test
public void shouldNotThrowExceptionOnHasNextWhenStoreClosed() {
iterator = new SegmentIterator(Collections.singletonList(segmentOne).iterator(),
hasNextCondition,
Bytes.wrap("a".getBytes()),
Bytes.wrap("z".getBytes()));


iterator.currentIterator = segmentOne.all();
segmentOne.close();
assertFalse(iterator.hasNext());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that comment about verifying that next() throws also applies here.

}

@Test
public void shouldOnlyIterateOverSegmentsInRange() {
iterator = new SegmentIterator(Arrays.asList(segmentOne, segmentTwo).iterator(),
Expand Down