KAFKA-7912: Support concurrent access in InMemoryKeyValueStore#6336
Conversation
|
|
||
| @Override | ||
| public synchronized KeyValueIterator<Bytes, byte[]> all() { | ||
| final TreeMap<Bytes, byte[]> copy = new TreeMap<>(this.map); |
There was a problem hiding this comment.
No need to copy the entire underlying store, just return an iterator on the existing map
|
\cc @vvcephei for review |
|
Java 8 failed retest this please |
bbejeck
left a comment
There was a problem hiding this comment.
Thanks, @ableegoldman good catch.
Overall it looks good to me, but this got me thinking.
Since we only have a store per task and a StreamThread only operates on one task at a time (even if it is assigned multiple tasks) could we hit this point in a streams application under regular circumstances? I guess the risk comes from the fact that someone could open an iterator and hold on to it without closing and then during further processing the map is updated and since the iterator is a view back to the original map, using the iterator again results in the exception
I'm not suggesting we don't merge this PR, as IMHO it's an improvement, it's got me thinking about our store access patterns.
|
It does seem like we're adding "some" overhead here to deal with edge cases of weird behavior for IQ only, since as you say we shouldn't hit this problem under normal operation. Might it be worth implementing two versions of the underlying stores, an optimized one for normal Streams and a "safe" one for IQ? I'll try to investigate how much of an overhead we're really introducing here, I suspect it's not terrible for the key-value store but might be less tolerable for the window store (or future session store) since I believe SkipList performance would degrade as we continually delete from the beginning as things expire..? |
I'm leaning towards no, if there is any additional overhead I don't think it would be that much and returning thread-safe iterators for IQ would outweigh the cost of maintaining two separate types. Additionally, we have |
|
Java 11 failed with Java 8 failed with retest this please |
|
Good point about performance degradation of SkipList if most of the modifications are at the top -- well depending on how clever J8's SkipList implementation is, as they can go as fancy as this: https://arxiv.org/pdf/1805.04794.pdf (related works introduced skiplist). But I think for now we can still afford for the overhead, even for windowed in-memory store: if we realized later in production that it is really bad we can come back to it as well. |
|
LGTM (java8 will not succeed so we can ignore atm). |
|
Also confirmed that the added test will fail without the other change. Merged to trunk, thanks @ableegoldman !! |
* apache/trunk: KAFKA-7880:Naming worker thread by task id (apache#6275) improve some logging statements (apache#6078) KAFKA-7312: Change broker port used in testMinimumRequestTimeouts and testForceClose KAFKA-7997: Use automatic RPC generation in SaslAuthenticate KAFKA-8002; Log dir reassignment stalls if future replica has different segment base offset (apache#6346) KAFKA-3522: Add TimestampedKeyValueStore builder/runtime classes (apache#6152) HOTFIX: add igore import to streams_upgrade_test MINOR: ConsumerNetworkClient does not need to send the remaining requests when the node is not ready (apache#6264) KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430 Part-1) KAFKA-7918: Inline generic parameters Pt. III: in-memory window store (apache#6328) KAFKA-8012; Ensure partitionStates have not been removed before truncating. (apache#6333) KAFKA-8011: Fix for race condition causing concurrent modification exception (apache#6338) KAFKA-7912: Support concurrent access in InMemoryKeyValueStore (apache#6336) MINOR: Skip quota check when replica is in sync (apache#6344) HOTFIX: Change header back to http instead of https to path license header test (apache#6347) MINOR: fix release.py script (apache#6317) MINOR: Remove types from caching stores (apache#6331) MINOR: Improve logging for alter log dirs (apache#6302) MINOR: state.cleanup.delay.ms default is 600,000 ms (10 minutes). (apache#6345) MINOR: disable Streams system test for broker upgrade/downgrade (apache#6341)
…e#6336) Previously the InMemoryKeyValue store would throw a ConcurrentModificationException if the store was modified beneath an open iterator. The TreeMap implementation was swapped with a ConcurrentSkipListMap for similar performance while supporting concurrent access. Added one test to AbstractKeyValueStoreTest, no existing tests caught this. Reviewers: Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
Previously the InMemoryKeyValue store would throw a ConcurrentModificationException if the store was modified beneath an open iterator. The TreeMap implementation was swapped with a ConcurrentSkipListMap for similar performance while supporting concurrent access.
Added one test to AbstractKeyValueStoreTest, no existing tests caught this.
Committer Checklist (excluded from commit message)