KAFKA-6878 NPE when querying global state store not in READY state#4978
KAFKA-6878 NPE when querying global state store not in READY state#4978guozhangwang merged 3 commits intoapache:trunkfrom tedyu:trunk
Conversation
|
Should we consider two other CachingWindowed and CachingSessioned store classes as well? A meta comment: instead of checking on |
|
I can apply similar change to CachingWindowStore and CachingSessionStore if people think the check is beneficial. w.r.t. additional locking, I tend to leave that to future JIRA. Additional locking increases the complexity of the code. |
|
With additional locking, client query may take arbitrarily long time. |
|
Java 8 error caused by out of memory. retest this please. |
|
Looking at CachingKeyValueStoreTest, |
guozhangwang
left a comment
There was a problem hiding this comment.
Also, with current change, client code can detect the completion of initialization, making the additional locking unnecessary (at least for now).
I'm a bit unclear how the client can detect if the initialization is completed? If get returns null it could mean 1) init has done, but the key / window / session does not exist, 2) init is not done. How to distinguish these two cases?
| validateStoreOpen(); | ||
| final Bytes bytesKey = WindowKeySchema.toStoreKeyBinary(key, timestamp, 0); | ||
| final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); | ||
| if (cache == null) { |
There was a problem hiding this comment.
In CachingSessionStore#findSessions, we could also check if cache is null.
There was a problem hiding this comment.
Looking at CachingSessionStore#findSessions, an Iterator is returned.
final ThreadCache.MemoryLRUCacheBytesIterator cacheIterator = cache.range(cacheName, cacheKeyFrom, cacheKeyTo);
When cache is null, should a special MemoryLRUCacheBytesIterator be returned ?
|
Updated change in CachingKeyValueStore so that we rely on |
|
With current change, client is not concerned with completion of initialization. |
|
Thanks @tedyu , the change lgtm. Merging to trunk. |
…4978) Check whether cache is null before retrieving from cache. Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
|
Also cherry-picked to 1.1 @tedyu I realized that in If the cache is not available, we could skip returning a wrapper iterator that merges the two iterators from underlying and the cache, but just return the underlying iterator directly. Could you submit another PR for this? |
|
Does it make sense to delegate the query to the underlying store? The issue about the non-initialed cache is, that the store itself was not initialized yet. Thus, the underlying store is not initialized either. From my point of view, as long as as a store is not initialized and fully recovered, it should not be possible to query the store. We have some check in place that validate that stores are opened but we are hitting a race condition for this case. Overall, I think that we should throw an |
|
Since underlying is passed to CachingKeyValueStore ctor, it is usable before initInternal finishes. |
|
The ordering of initialization is the following:
The ordering of
So I think the race condition that @mjsax was talking about is around I think to fix this issue, in since it should be sufficient. In this case the check on step 3) will fail if |
|
Opened #4988 |
This is continuation of #4978. From Guozhang: I think to fix this issue, in init we could consider switching the steps of 1 and 2: initInternal(context); underlying.init(context, root); since volatile boolean open = false; it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException. Reviewers: Guozhang Wang <wangguoz@gmail.com>
This is continuation of #4978. From Guozhang: I think to fix this issue, in init we could consider switching the steps of 1 and 2: initInternal(context); underlying.init(context, root); since volatile boolean open = false; it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException. Reviewers: Guozhang Wang <wangguoz@gmail.com>
…-record-version * apache-github/trunk: KAFKA-6894: Improve err msg when connecting processor with global store (apache#5000) KAFKA-6893; Create processors before starting acceptor in SocketServer (apache#4999) MINOR: Fix typo in ConsumerRebalanceListener JavaDoc (apache#4996) MINOR: Remove deprecated valueTransformer.punctuate (apache#4993) MINOR: Update dynamic broker configuration doc for truststore update (apache#4954) KAFKA-6870 Concurrency conflicts in SampledStat (apache#4985) KAFKA-6361: Fix log divergence between leader and follower after fast leader fail over (apache#4882) KAFKA-6813: Remove deprecated APIs in KIP-182, Part II (apache#4976) KAFKA-6878 Switch the order of underlying.init and initInternal (apache#4988) KAFKA-6299; Fix AdminClient error handling when metadata changes (apache#4295) KAFKA-6878: NPE when querying global state store not in READY state (apache#4978) KAFKA 6673: Implemented missing override equals method (apache#4745) KAFKA-6834: Handle compaction with batches bigger than max.message.bytes (apache#4953)
…pache#4978) Check whether cache is null before retrieving from cache. Reviewers: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>
…he#4988) This is continuation of apache#4978. From Guozhang: I think to fix this issue, in init we could consider switching the steps of 1 and 2: initInternal(context); underlying.init(context, root); since volatile boolean open = false; it should be sufficient. In this case the check on step 3) will fail if underlying.init is not completed and we will throw InvalidStateStoreException. Reviewers: Guozhang Wang <wangguoz@gmail.com>
Check whether cache is null before retrieving from cache.
Committer Checklist (excluded from commit message)