KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter#6161
Conversation
| final AGG newValue = serdes.valueFrom(entry.newValue()); | ||
| final AGG oldValue = newValue == null || sendOldValues ? fetchPrevious(rawKey, key.window()) : null; | ||
| final AGG oldValue = newValue == null || sendOldValues ? | ||
| serdes.valueFrom(bytesStore.fetch(rawKey, key.window().start(), key.window().end())) : |
There was a problem hiding this comment.
This is the actual fix.
| } | ||
| return value; | ||
| } | ||
| return store.fetch(key.key(), key.window().start(), key.window().end()); |
There was a problem hiding this comment.
This is a good-to-have for perf.
| final Windowed<Bytes> a = new Windowed<>(keyA, new SessionWindow(2, 4)); | ||
| final Windowed<Bytes> b = new Windowed<>(keyA, new SessionWindow(1, 2)); | ||
| final Windowed<String> aDeserialized = new Windowed<>("a", new SessionWindow(2, 4)); | ||
| final Windowed<String> bDeserialized = new Windowed<>("a", new SessionWindow(1, 2)); |
There was a problem hiding this comment.
This additional record is for illustrating the bug. Without the fix the flushed entries would be incorrect.
|
|
||
| public static byte[] toBinary(final Windowed<Bytes> sessionKey) { | ||
| final byte[] bytes = sessionKey.key().get(); | ||
| public static Bytes toBinary(final Bytes key, |
There was a problem hiding this comment.
This is just some refactoring of the code to 1) merge the logic into a single function and 2) do Bytes.wrap internally so that callers do not need to call.
There was a problem hiding this comment.
Thanks for the patch @guozhangwang. I made a pass over this, and I have a couple of minor comments, but otherwise, this looks good to me.
I do think we need a KIP for the addition to the ReadonlySessionStore, but the use case clean and straightforward so IMHO we should be able to get this in for 2.2.
| } | ||
| cache.addDirtyEntryFlushListener(cacheName, entries -> { | ||
| for (final ThreadCache.DirtyEntry entry : entries) { | ||
| putAndMaybeForward(entry, context); |
| validateStoreOpen(); | ||
| final Bytes bytesKey = SessionKeySchema.toBinary(key, startTime, endTime); | ||
| final Bytes cacheKey = cacheFunction.cacheKey(bytesKey); | ||
| if (cache == null) { |
There was a problem hiding this comment.
nit: move the cache == null check two lines up and save creating the bytesKey and cacheKey operations when the cache is null. I know this is a minor point, but IMHO still worth the change.
| } catch (final InvalidStateStoreException ise) { | ||
| throw new InvalidStateStoreException("State store [" + storeName + "] is not available anymore" + | ||
| " and may have been migrated to another instance; " + | ||
| "please re-discover its location from the state metadata."); |
There was a problem hiding this comment.
nit: do we want to include the original InvalidStateStoreException (ise) in the new exception we are throwing?
There was a problem hiding this comment.
We intentionally change the error message with the same InvalidStateStoreException type in the original design. But thinking about it again, I think it still worth exposing the original exception for debugging purposes.
|
|
||
| @Override | ||
| public AGG fetch(final K key, final long startTime, final long endTime) { | ||
| return serdes.valueFrom(bytesStore.get(SessionKeySchema.toBinary(Bytes.wrap(serdes.rawKey(key)), startTime, endTime))); |
There was a problem hiding this comment.
nit: validate key is not null here?
There was a problem hiding this comment.
This is validated in higher in the call trace (MeteredXX) already.
|
|
||
| public static byte[] toBinary(final Windowed<Bytes> sessionKey) { | ||
| final byte[] bytes = sessionKey.key().get(); | ||
| public static Bytes toBinary(final Bytes key, |
…derbytes-upper-range
…derbytes-upper-range
|
@bbejeck updated per comments. |
…derbytes-upper-range
| " and may have been migrated to another instance; " + | ||
| "please re-discover its location from the state metadata."); | ||
| "please re-discover its location from the state metadata. " + | ||
| "Original error message: " + ise.toString()); |
There was a problem hiding this comment.
Why not just include ise as the cause? Do we want to hide its stacktrace of something?
There was a problem hiding this comment.
Yes this is intentional indeed. Since this is a user-facing interface we were abstracting the internal stacktrace from users.
…derbytes-upper-range
|
KIP-420 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores) has been voted before the 2.2 KIP deadline. Merging to trunk now. |
* AK/trunk: fix typo (apache#5150) MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887) KAFKA-7766: Fail fast PR builds (apache#6059) KAFKA-7798: Expose embedded clientIds (apache#6107) KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163) KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377) MINOR: Fix some field definitions for ListOffsetReponse (apache#6214) KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203) KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022) MINOR: fix checkstyle suppressions for generated RPC code to work on Windows KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188) KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161) KAFKA-3522: Add RocksDBTimestampedStore (apache#6149) KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
…for flushing / getter (apache#6161) apache#2972 tried to fix a bug about flushing operation, but it was not complete, since findSessions(key, earliestEnd, latestStart) does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart. I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
#2972 tried to fix a bug about flushing operation, but it was not complete, since
findSessions(key, earliestEnd, latestStart)does not guarantee to only return a single entry since its semantics are to return any sessions whose end > earliestEnd and whose start < latestStart.I've tried various ways to fix it completely and I ended up having to add a single-point query to the public ReadOnlySessionStore API for the exact needed semantics. It is used for flushing to read the old values (otherwise the wrong old values will be sent downstreams, hence it is a correctness issue) and also for getting the value for value-getters (it is for perf only).
Committer Checklist (excluded from commit message)