KAFKA-8029: In memory session store#6525
Conversation
dguy
left a comment
There was a problem hiding this comment.
Hi @ableegoldman left a comment about the remove logic. I guess this is still WIP and you'll add tests?
There was a problem hiding this comment.
Not sure what this is doing. Seems like it should be:
if (keyMap.isEmpty()) {
endTimeMap.remove(sessionKey.window().end());
}
There was a problem hiding this comment.
Hm, not sure what happened here either. Thanks for catching this, will fix
|
Tests added and passing, ready for review @dguy @bbejeck @vvcephei @mjsax @guozhangwang |
b0e0a7e to
0ba2916
Compare
There was a problem hiding this comment.
Thanks for PR @ableegoldman I've made a pass and I have some comments on test coverage:
- In the
putmethod theelseblock on line 123 isn't tested (passing in anullaggregate value) - no coverage on
fetchSessionor thefetchmethods
|
Java 8 and Java 11 failed results cleaned up already. retest this please |
|
@bbejeck Thanks, I've expanded the test coverage to those methods -- I also went ahead and added the tests to the RocksDBSessionStoreTest, which this class largely mirrors |
vvcephei
left a comment
There was a problem hiding this comment.
Hey @ableegoldman , Thanks for the PR! Sorry it took so long to review it.
|
|
||
| private final String name; | ||
| private final String metricScope; | ||
| private InternalProcessorContext context; |
| this.metricScope = metricScope; | ||
|
|
||
| this.openIterators = ConcurrentHashMap.newKeySet(); | ||
| this.endTimeMap = new ConcurrentSkipListMap<>(); |
There was a problem hiding this comment.
nit: we don't need qualifiers on these references. We could also initialize them in the declaration, but it's your call.
There was a problem hiding this comment.
Actually I agree, it's cleaner to initialize in the declarations ... I was trying to be consistent with the existing stores but that's really no reason here
| } | ||
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") |
There was a problem hiding this comment.
I think this suppression is unnecessary?
|
|
||
| public class InMemorySessionStoreTest { | ||
|
|
||
| private final String storeName = "InMemorySessionStore"; |
There was a problem hiding this comment.
nit:
| private final String storeName = "InMemorySessionStore"; | |
| private static final String storeName = "InMemorySessionStore"; |
| public class InMemorySessionStoreTest { | ||
|
|
||
| private final String storeName = "InMemorySessionStore"; | ||
| private final static long RETENTION_PERIOD = 10_000L; |
There was a problem hiding this comment.
nit
| private final static long RETENTION_PERIOD = 10_000L; | |
| private static final long RETENTION_PERIOD = 10_000L; |
| final String keyFrom = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", -1)); | ||
| final String keyTo = Serdes.String().deserializer().deserialize("", Serdes.Integer().serializer().serialize("", 1)); | ||
|
|
||
| final KeyValueIterator iterator = sessionStore.findSessions(keyFrom, keyTo, 0L, 10L); |
There was a problem hiding this comment.
This type is not parameterized. It's generally better to list the parameters when you reference a parameterized type.
| import org.junit.Before; | ||
| import org.junit.Test; | ||
|
|
||
| public class InMemorySessionStoreTest { |
There was a problem hiding this comment.
missing coverage on:
- expired segments
- retention time
- fetchSession, which doesn't find a session
- fetch
There was a problem hiding this comment.
Can you clarify:
- fetch (both fetch methods have unit tests?)
- what is the difference testing-wise between expired segments and retention time(technically there are no "segments" for this store)? There is a test shouldRemoveExpired()
- Good thought Re: fetchSession not finding a session. Currently this will cause a NPE, which is true for the equivalent of fetchSession in every store implementation -- none of the other stores test for this. This seems like a bug? I would argue that returning null when finding nothing seems like the appropriate behavior for the underlying stores and the bug is in the deserializing layer, which should check for null before blindly deserializing..WDYT?
There was a problem hiding this comment.
Sure, I was just going by the "Run InMemorySessionStoreTest with Code Coverage" feature in IDEA. I don't believe in shooting for 100% coverage or anything, I just mentioned the things that looked like you probably wanted to test.
- In
InMemorySessionStore, lines 186 and 188 (fetch(final Bytes key)), and 193 and 195 (fetch(final Bytes from, final Bytes to)) are never invoked. - I was just referring to
InMemorySessionStorelines 105 and 106 (duringput) as expired records.
For "retention period" I meant line 147, wherefetchSessionreturns null. But upon second reading, I see that fetchSession might be returning null for several reasons (which illustrates my point about KAFKA-8029: In memory session store #6525 (comment)). Regardless, the fall-though case is not covered. - Interesting, yeah, it doesn't seem like this should cause an NPE. I see at least one usage in the code where we check the result for null, so apparently, we do expect this to return null some times. It sounds like a bug to me, either because the deserializer should preserve nulls, or because we should skip the deserializer for nulls (both patterns are present in the code).
There was a problem hiding this comment.
All three of these are actually unrelated to InMemorySessionStore and require some fixes before they can be addressed/properly tested here -- see #6575
There was a problem hiding this comment.
Thanks @ableegoldman , are you planning to wait for #6575 to be merged before proceeding with this PR, or just not worry about it right now?
There was a problem hiding this comment.
I think we should not worry about that here, and add corresponding tests for null fetchSession results in that PR?
| return startTimeMap.get(startTime); | ||
| } | ||
| } | ||
| return null; |
There was a problem hiding this comment.
This has been a point of debate in the past, and not everyone agrees with me, but I'll bring it up just once because it's my honest feedback...
This style of having one-sided conditionals and early returns generally makes it harder to trace the flow of execution. It's more verbose, but also more resistant to bugs to prefer keeping both if and else branches (or using ternary operators), and (again, generally) not returning early or falling though to a catch-all return at the end of a method.
This feedback applies elsewhere in this PR as well.
There was a problem hiding this comment.
Hm, I see your point. I think this is unavoidable in places but some other methods (hasNext) can be cleaned up
Committing suggestions from code review Co-Authored-By: ableegoldman <ableegoldman@gmail.com>
|
Thanks for the revision, @ableegoldman . In case the results are gone by the time you look, the tests didn't run because of an unused import: |
|
Hey @ableegoldman , I was just thinking about the test in here... It seems like, for the most part, we expect the in-memory and persistent store to share the same behavior (as that behavior is defined by their common interface). I'm wondering if it would simplify and consolidate our testing if we just write one test to verify the test and swap in the different implementations. I have an example of one way to do this in my "persistent suppress" branch: https://github.com/apache/kafka/blob/c386da2119d51298e4302d82571ecacbbfed0dc9/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.java WDYT? |
|
@vvcephei I was thinking about that (since I did end up copy/pasting many of the tests from the RocksDBSessionStore) .. however I'm not 100% convinced it would be a good idea to merge them since the two stores do actually differ in the order of results returned by an iterator. I don't think this discrepancy actually comes up with any of the existing tests, I'm just concerned that in the future this might trip someone up and seem like a bug even if test coverage is expanded..WDYT? |
|
Interesting... I guess the challenge would be to write the test in a way that permits permissable differences, while enforcing the desired contract. In other words, what is the actual contract in terms of iteration order (etc) ? It would actually be incredibly useful to codify that contract in a test, so we know what behavior changes are legal and which ones aren't. For example, there's very low utility in a test that expects a specific iteration order, when that iteration order itself isn't important. That's how you wind up rewriting tests in conjunction with code changes, which is risky. |
|
Fair point. Since the iterators make no guarantees Re: ordering we should probably rewrite the tests to ignore the order. I think it's worth pushing this to a follow-up PR and combining the in-memory/RocksDB tests for all store types (all key-value type stores are already consolidated, actually. At some point we should do this for window and session stores too, but I scoped this out and believe it would be a nontrivial time investment so it's probably not worth prioritizing now. But as a first step I think we could/should consolidate at least the two flavors (in-memory/rocksDB) of window/session store tests. |
|
Sounds good. As long as we do it before the release, I'm happy. I'm just a little skittish because I actually did find some subtle bugs in the in-memory suppression buffer when I wrote that test I linked to earlier. Although, I guess the risk is less for you because you just copy/pasted the test over, so it should be covering the same cases. |
|
|
||
| @Test | ||
| public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() { | ||
| LogCaptureAppender.setClassLoggerToDebug(InMemoryWindowStore.class); |
There was a problem hiding this comment.
Shouldn't this rather be InMemorySessionStore.class?
There was a problem hiding this comment.
Whoops. Good catch!
|
LGTM. Thanks @ableegoldman ! |
* ak/trunk: (42 commits) KAFKA-8134: `linger.ms` must be a long KAFKA-7779; Avoid unnecessary loop iteration in leastLoadedNode (apache#6081) MINOR: Update Gradle to 5.4.1 and update its plugins (apache#6436) MINOR: improve Session expiration notice (apache#6618) KAFKA-8029: In memory session store (apache#6525) MINOR: In-memory stores cleanup (apache#6595) KAFKA-7862 & KIP-345 part-one: Add static membership logic to JoinGroup protocol (apache#6177) KAFKA-8254: Pass Changelog as Topic in Suppress Serdes (apache#6602) KAFKA-7903: automatically generate OffsetCommitRequest (apache#6583) KAFKA-8291 : System test fix (apache#6637) MINOR: Do not log retriable offset commit exceptions as errors (apache#5904) MINOR: Fix log message error of loadTransactionMetadata (apache#6571) MINOR: Fix 404 security features links (apache#6634) MINOR: Remove an unnecessary character from broker's startup log MINOR: Make LogCleaner.shouldRetainRecord more readable (apache#6590) MINOR: Remove implicit return statement (apache#6629) KAFKA-8237; Untangle TopicDeleteManager and add test cases (apache#6588) KAFKA-8227 DOCS Fixed missing links duality of streams tables (apache#6625) MINOR: reformat settings.gradle to be more readable (apache#6621) MINOR: Correct RestServerTest formatting ... Conflicts: build.gradle settings.gradle
First pass at an in-memory session store implementation. Reviewers: Simon Geisler, Damian Guy <damian@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
First pass at an in-memory session store implementation. Waiting for KIP to be voted on/accepted
Committer Checklist (excluded from commit message)