Skip to content

KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store#4801

Merged
guozhangwang merged 7 commits intoapache:trunkfrom
bbejeck:KAFKA_6704_invalid_state_store_error_possible_from_iq
Jun 5, 2018
Merged

KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store#4801
guozhangwang merged 7 commits intoapache:trunkfrom
bbejeck:KAFKA_6704_invalid_state_store_error_possible_from_iq

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Mar 30, 2018

While using an iterator from IQ, it's possible to get an InvalidStateStoreException if the StreamThread closes the store during a range query.

Added a unit test to SegmentIteratorTest for this condition.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an expected condition before. This fix will suppress this Exception, do we want to consider another approach?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not still throw and NoSuchElementException? Not sure if we need to test for this here thought. It's just for clarification.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next() does. I agree about the test maybe we should remove it?
WDYT?

\cc @guozhangwang @vvcephei

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot follow with your comment. "maybe we should remove it" remove what?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test itself.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test is fine -- make sense to ensure that hasNext does not throw. We should add a second test, that expects next() to throw IMHO.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should keep the test.

The Iterator interface documents this relationship between next() and hasNext(). If it's in doubt whether next() will throw when hasNext() returns false, I think we should test it.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Mar 30, 2018

@guozhangwang @mjsax @vvcephei for reviews

@mjsax mjsax requested review from dguy, guozhangwang and mjsax March 30, 2018 23:14
@mjsax mjsax added the streams label Mar 30, 2018
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not still throw and NoSuchElementException? Not sure if we need to test for this here thought. It's just for clarification.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this? I thought close() will trigger an exception in the inner iterator already?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack. Cleaned the test up.

@bbejeck bbejeck force-pushed the KAFKA_6704_invalid_state_store_error_possible_from_iq branch from c3b3d43 to da22892 Compare April 4, 2018 21:17
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Apr 4, 2018

updated this

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify for myself, is there any explicit or implied guarantee that the iterator will return all the records in the store?

Because this appears to be the behavior before this change: that if you drain the iterator until hasNext() returns false, you will see all records in the store. If you get an exception in the middle, you obviously know you may not have seen all records. But if we now just "end" the iterator when the state store is closed, I cannot distinguish whether the state store closed in the middle of my read or whether I read all the records.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we should keep the test.

The Iterator interface documents this relationship between next() and hasNext(). If it's in doubt whether next() will throw when hasNext() returns false, I think we should test it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that comment about verifying that next() throws also applies here.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 5, 2018

Meta comment: @vvcephei Last comment just started this thought. We should rethink this change. Don't we introduce a race condition between hasNext() and next() ? If hasNext() return true and than the segment expires, next() might still throw....

Maybe it's better to just keep the current behavior that hasNext() might throw, and handle the exception in an upper layer?

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Apr 5, 2018

@vvcephei good point.

@mjsax you raise a good point as well if we should do this change at all as it will not cover the condition of hasNext() -> true; the store is closed; next() is called and throws...

I'm thinking now it's better to leave behavior as we have it. WDYT?

\cc @guozhangwang @mjsax @vvcephei

@vvcephei
Copy link
Copy Markdown
Contributor

vvcephei commented Apr 5, 2018

Because I hate to make any decision too easy, you can fix that race condition by buffering the next record in hasNext and using the existence of a record in the buffer to determine the answer to hasNext. This is how AbstractIterator in Guava works.

But I still kinda think that the current behavior might be better, for the reason I cited earlier ;)

It might be the case that we could document it better or throw a clearer exception.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 12, 2018

I tend to agree, that just document the behavior in detail might be better -- would be good to hear @guozhangwang and @dguy opinion.

@guozhangwang
Copy link
Copy Markdown
Contributor

I think this issue can be resolved following the org.apache.kafka.common.utils.AbstractIterator pattern, i.e when we call hasNext we make the next element ready, so that next() will become a trivial call to return that item, and it does not check on hasNext again any more. By doing this we can remove the gap that can cause race condition.

Still, if we realize that in hasNext the store has closed, return false.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Apr 18, 2018

sounds good to me.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Apr 19, 2018

Thanks @guozhangwang -- that's along the lines what @vvcephei suggested. +1

@vvcephei
Copy link
Copy Markdown
Contributor

@guozhangwang @mjsax , that pattern would resolve the race condition, but I'm still wondering if we're better off throwing an exception.

If I'm iterating over a collection, and I reach the end of the iterator, wouldn't I assume that I've seen the whole collection?

For some context, I searched around a little to find out what happens in other databases. Apparently, in Oracle (and some other unnamed RDBMS), if you concurrently run a query and drop or truncate the table, depending on the exact options, you'll get one of these outcomes:

  • the table becomes unavailable for new queries, the query completes fully and normally, and then the table is destroyed
  • the table is immediately destroyed and the query immediately returns with an error (that the "object no longer exists")

FWIW, I think these semantics are actually simpler.

KIP-216 may come to bear on this topic. I think under that proposal we would throw some kind of RetriableException instead of InvalidStateStoreException. Maybe this is a good balance of safe and clear?

@guozhangwang
Copy link
Copy Markdown
Contributor

guozhangwang commented Apr 25, 2018

Note that the issue we're fixing only relates to range queries for window stores, which we organize as segmented stores. This is an internal implementation detail that users should be abstracted away with. So to user's point of view, as long as the whole window store is not closed (i.e. task not migrated out, thread not dying etc) it should always be queryable. So I think capturing the exception internally and skip the segment is a better idea.

When we capture the exception, we should not terminate the iterator by returning false to the outmost hasNext directly, but we should only skip that closed segment only; if there are more segments to iterate over then we should continue to them.

@guozhangwang
Copy link
Copy Markdown
Contributor

@bbejeck what's the status of this PR?

@bbejeck bbejeck force-pushed the KAFKA_6704_invalid_state_store_error_possible_from_iq branch from da22892 to 3323708 Compare June 4, 2018 14:58
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jun 4, 2018

rebased this

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can still try out the approach with

private class RocksDbIterator extends AbstractIterator<KeyValue<Bytes, byte[]>> implements KeyValueIterator<Bytes, byte[]>

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jun 4, 2018

I have an update using AbstractIterator just need to get some other tests passing will update this PR soon

@bbejeck bbejeck force-pushed the KAFKA_6704_invalid_state_store_error_possible_from_iq branch from 3323708 to 9e8d65c Compare June 5, 2018 13:55
@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Jun 5, 2018

updated this using AbstractIterator

public synchronized KeyValue<Bytes, byte[]> next() {
if (!hasNext())
throw new NoSuchElementException();
return super.next();
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if required but this method was synchronized in the first place so I've kept it that way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack.

public synchronized boolean hasNext() {
if (!open) {
throw new InvalidStateStoreException(String.format("RocksDB store %s has closed", storeName));
}
Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this check here has got me to thinking more about this issue.

Without this guard condition, we have some failing unit tests.

In both the RocksDBIterator and the AbstractIterator all calls to next() make a call to hasNext() first before returning the next object. I'm not sure about changing the semantics where we return from next() without calling hasNext() first (which if we end up keeping those semantics, leaves us in the same position as before extending AbstractIterator).

I guess the question is, do we want to continue to throw an exception when hasNext() is called (when the store is closed) or simply return false?

I could be overthinking this, but I'm not entirely comfortable with returning a value from next() after closing the store. I feel like that creates more corner cases for potential errors or unexpected behavior.

WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bbejeck That is a good question!

Originally I thought it is okay to always calling hasNext inside next(), as long as we make sure hasNext implementation is idempotent, i.e. calling it multiple times before next() does not have side effect is sufficient. But by making it idempotent we could have the corner case you mentioned. For example:

t0: call `hasNext()` -> store is still open -> call `makeNext` -> `next` field is set.
t1: store is closed.
t2: call `next()` -> call `hasNext()` again

Without this check, at t3 we would still return the next field.

}

private KeyValue<Bytes, byte[]> getKeyValue() {
return new KeyValue<>(new Bytes(iter.key()), iter.value());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nit (and paranoid) comment: maybe we can reuse the same KeyValue object, but just set its key / value fields since they are public and not final. So we do not create those short-lived objects for young gen GC. Not sure how much it will really get us, but just want to be safer since it is part of a critical code path (i.e. one object per each iterated element).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

Copy link
Copy Markdown
Member Author

@bbejeck bbejeck Jun 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With another look, KeyValue is immutable key and value fields are final. We could extend KeyValue as an inner class of RocksDBStore to accomplish this. WDYT?

NM that won't work.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Do not bother then :) At lease we are not introduce a regression to make perf worse :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am late thus just a meta comment: we hand the KeyValue object to the user and user might actually keep a reference. Thus, we cannot reuse an object anyway, because we might mess up user code if they access an earlier return KeyValue again, after they retrieved newer ones.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that's an excellent point.

@guozhangwang guozhangwang merged commit ef41369 into apache:trunk Jun 5, 2018
ijuma added a commit to edoardocomar/kafka that referenced this pull request Jun 6, 2018
…grained-acl-create-topics

* apache-github/trunk:
  KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097)
  MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040)
  KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE
  MINOR: docs should point to latest version (apache#5132)
  KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298)
  [KAFKA-6730] Simplify State Store Recovery (apache#5013)
  MINOR: Rename package `internal` to `internals` for consistency (apache#5137)
  KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801)
  MINOR: Add missing configs for resilience settings
  MINOR: Add regression tests for KTable mapValues and filter (apache#5134)
  KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829)
  KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956)
  KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128)
  KAFKA-6813: return to double-counting for count topology names (apache#5075)
  KAFKA-5919; Adding checks on "version" field for tools using it
  MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
ijuma added a commit to big-andy-coates/kafka that referenced this pull request Jun 6, 2018
…refix

* apache-github/trunk:
  KAFKA-6726: Fine Grained ACL for CreateTopics (KIP-277) (apache#4795)
  KAFKA-5588: Remove deprecated --new-consumer tools option (apache#5097)
  MINOR: Fix for the location of the trogdor.sh executable file in the documentation. (apache#5040)
  KAFKA-6997: Exclude test-sources.jar when $INCLUDE_TEST_JARS is FALSE
  MINOR: docs should point to latest version (apache#5132)
  KAFKA-6981: Move the error handling configuration properties into the ConnectorConfig and SinkConnectorConfig classes (KIP-298)
  [KAFKA-6730] Simplify State Store Recovery (apache#5013)
  MINOR: Rename package `internal` to `internals` for consistency (apache#5137)
  KAFKA-6704: InvalidStateStoreException from IQ when StreamThread closes store (apache#4801)
  MINOR: Add missing configs for resilience settings
  MINOR: Add regression tests for KTable mapValues and filter (apache#5134)
  KAFKA-6750: Add listener name to authentication context (KIP-282) (apache#4829)
  KAFKA-3665: Enable TLS hostname verification by default (KIP-294) (apache#4956)
  KAFKA-6938: Add documentation for accessing Headers on Kafka Streams Processor API (apache#5128)
  KAFKA-6813: return to double-counting for count topology names (apache#5075)
  KAFKA-5919; Adding checks on "version" field for tools using it
  MINOR: Remove deprecated KafkaStreams constructors in docs (apache#5118)
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
…es store (apache#4801)

While using an iterator from IQ, it's possible to get an InvalidStateStoreException if the StreamThread closes the store during a range query.

Added a unit test to SegmentIteratorTest for this condition.

Reviewers: John Roesler <john@confluent.io>, Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
@bbejeck bbejeck deleted the KAFKA_6704_invalid_state_store_error_possible_from_iq branch July 10, 2024 12:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants