KAFKA-3522: Add in-memory TimestampedKeyValueStore#6151
KAFKA-3522: Add in-memory TimestampedKeyValueStore#6151mjsax wants to merge 4 commits intoapache:trunkfrom
Conversation
|
Call for review @guozhangwang @bbejeck @vvcephei |
There was a problem hiding this comment.
Using Arrays.copyRange here -- on other places, I uses System.arraycopy() -- other code uses ByteBuffer -- not sure which one is the best. Any insights?
There was a problem hiding this comment.
Arrays.copyRange() uses System.arraycopy() after doing range check so I would use former for safety. Not sure about ByteBuffer. Can you provide example where it is used?
There was a problem hiding this comment.
-
WindowKeySchema: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L111-L113 -
SessionKeySchema: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionKeySchema.java#L143-L146 -
SegmentedCachFunction: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/internals/SegmentedCacheFunction.java#L45
And some more...
There was a problem hiding this comment.
Interesting! I was unaware of this method.
The range check is just making sure that the "end" index isn't after the "start" index. This is probably just to give a nice error message, since it would otherwise result in a negative "length" argument and throw a different runtime exception (java.lang.ArrayIndexOutOfBoundsException)
It seems like the real advantage is the compile-time type safety it gives, since there's nothing preventing you from using System.arrayCopy to copy from an int[] into a String[]. You'd get a runtime exception (java.lang.ArrayStoreException). Using Arrays.copyOfRange, you get a compile-time check that your types are right.
Plus, it creates the destination array for you, which is also handy.
Since Java seems to lack a structural array slice (I.e., get a new array that is actually a view on a backing array), you have to make a copy to slice the array, so the current method seems appropriate.
ByteBuffer actually does let you do a structural slice, but afaict, any mechanism that gets an array from a ByteBuffer either gives you the whole (unsliced) backing array or makes a copy any way. If the deserializer accepted a ByteBuffer, we could do these deserializations without making array copies, but as it is, it seems like the Deserializer interface is painting us into a corner by requiring arrays as input.
The reason (I think) that we use ByteBuffer sometimes is that its builder methods make it easy to pack and unpack data into and out of arrays. This can be an error prone process in general, so it's sometimes nice. But what you're doing is pretty straightforward, and as long as the serializer and deserializer are symmetrical, it's going to be fine.
There was a problem hiding this comment.
What is your conclusion on what we should use?
There was a problem hiding this comment.
I'd vote to go with Arrays.CopyOfRange since its extra check logic does not seem to incur a huge overhead, so although this is on the critical code path it may still not a big perf regression.
There was a problem hiding this comment.
As discussed in person, we agreed on using ByteBuffer.
There was a problem hiding this comment.
Here I use System.arraycopy (cf. my comment above)
There was a problem hiding this comment.
Seems fine to me. Just to illustrate the advantage of ByteBuffer, it would be:
ByteBuffer
.allocate(rawTimestamp.length + rawValue.length)
.put(rawTimestamp)
.put(rawValue)
.array()
or
ByteBuffer
.wrap(new byte[rawTimestamp.length + rawValue.length])
.put(rawTimestamp)
.put(rawValue)
.array()
It's both more compact and less of a chance of messing up the ranges.
But it's also functionally equivalent.
I personally have no preference; just answering your question.
There was a problem hiding this comment.
Ah. Here it is. There is no conclusion :)
There was a problem hiding this comment.
It looks like this was maybe de-privatized for use in InMemoryTimestampedKeyValueStore, but that class has its own inner iterator class.
Should we restore the private modifier?
There was a problem hiding this comment.
Ah. Actually this code should not be here -- it's the same as InMemroyKeyValueStore.InMemoryKeyValueIterator -- that's why private modifier was removed.
Will remove this nested class to avoid code duplication.
There was a problem hiding this comment.
Interesting! I was unaware of this method.
The range check is just making sure that the "end" index isn't after the "start" index. This is probably just to give a nice error message, since it would otherwise result in a negative "length" argument and throw a different runtime exception (java.lang.ArrayIndexOutOfBoundsException)
It seems like the real advantage is the compile-time type safety it gives, since there's nothing preventing you from using System.arrayCopy to copy from an int[] into a String[]. You'd get a runtime exception (java.lang.ArrayStoreException). Using Arrays.copyOfRange, you get a compile-time check that your types are right.
Plus, it creates the destination array for you, which is also handy.
Since Java seems to lack a structural array slice (I.e., get a new array that is actually a view on a backing array), you have to make a copy to slice the array, so the current method seems appropriate.
ByteBuffer actually does let you do a structural slice, but afaict, any mechanism that gets an array from a ByteBuffer either gives you the whole (unsliced) backing array or makes a copy any way. If the deserializer accepted a ByteBuffer, we could do these deserializations without making array copies, but as it is, it seems like the Deserializer interface is painting us into a corner by requiring arrays as input.
The reason (I think) that we use ByteBuffer sometimes is that its builder methods make it easy to pack and unpack data into and out of arrays. This can be an error prone process in general, so it's sometimes nice. But what you're doing is pretty straightforward, and as long as the serializer and deserializer are symmetrical, it's going to be fine.
There was a problem hiding this comment.
Seems fine to me. Just to illustrate the advantage of ByteBuffer, it would be:
ByteBuffer
.allocate(rawTimestamp.length + rawValue.length)
.put(rawTimestamp)
.put(rawValue)
.array()
or
ByteBuffer
.wrap(new byte[rawTimestamp.length + rawValue.length])
.put(rawTimestamp)
.put(rawValue)
.array()
It's both more compact and less of a chance of messing up the ranges.
But it's also functionally equivalent.
I personally have no preference; just answering your question.
a7a89c9 to
4b4f0c0
Compare
|
Updated this. |
| import java.util.NavigableMap; | ||
| import java.util.TreeMap; | ||
|
|
||
| public class InMemoryTimestampedKeyValueStore<K, V> implements TimestampedKeyValueStore<K, V> { |
There was a problem hiding this comment.
Just curious, why are we implementing this as a separate class instead of wrapping the existing InMemoryKeyValueStore?
There was a problem hiding this comment.
In the original KIP, we added new method to interface TimestampedKeyValueStore, eg:
public synchronized ValueAndTimestamp<V> putIfAbsent(final K key,
final V value,
final long timestamp)
Hence, this PR is a little out dated, and I agree that we don't need InMemoryTimestampedKeyValueStore any longer -- I'll cleanup and rebase the PR accordingly.
|
Turns out, after rebasing there is nothing left on this PR. Closing it, because we don't need it any longer. |
Part of KIP-258.