Conversation
| }; | ||
| } | ||
|
|
||
| public static class WindowStoreFacade<A, B> implements WindowStore<A, B>, RecordConverter { |
There was a problem hiding this comment.
QQ:
- Does this class need to be a public static one?
- Can it extend form
ReadOnlyWindowStoreFacade?
There was a problem hiding this comment.
I guess in the end we will find these classes a better place (currently they are a bit scattered, e.g. KeyValueStoreFacade is in the materializer class).
There was a problem hiding this comment.
I guess those -- this PR is a POC and hacked together. I would never recommend to merge it... There are many issue like this.
| public WindowStoreFacade(final WindowStore<A, ValueAndTimestamp<B>> store) { | ||
| this.inner = store; | ||
| final StateStore rootStore = store instanceof WrappedStateStore ? ((WrappedStateStore) store).inner() : store; | ||
| innerRecordConvert = rootStore instanceof RecordConverter ? (RecordConverter) rootStore : new DefaultRecordConverter(); |
There was a problem hiding this comment.
This makes me thinking about WrappedStateStore in which we do not check inner but blindly assumes it always implement this interface:
public ConsumerRecord<byte[], byte[]> convert(final ConsumerRecord<byte[], byte[]> record) {
return ((RecordConverter) innerState).convert(record);
}
Is that safe? Could it be a non-caching, non-logging store (i.e. only one layer of wrapper on metered, and the inner is already the inner-most underlying store which is customized store not implementing the interface)?
There was a problem hiding this comment.
That could be the case -- this part of the code is not extensively tested yet. I was aware of it already on the old "big PR" and planned to address it, on the smaller PR when we actually add upgrade tests ect.
| metrics = (StreamsMetricsImpl) context.metrics(); | ||
|
|
||
| otherWindow = (WindowStore<K, V2>) context.getStateStore(otherWindowName); | ||
| StateStore store = context.getStateStore(otherWindowName); |
There was a problem hiding this comment.
Hmm.. we are pealing off two layers here. The second layer is window-store-facade, what's the first layer of WrappedStateStore?
Ditto elsewhere.
There was a problem hiding this comment.
For some stores, it sometimes one layer, sometimes multiple. Thus, this is a guard to get the right store for any case -- I use the same code for all DSL processor for simplicity in this PR. Might not be necessary for all processors.
| final ValueAndTimestamp<V> parentValueAndTimestamp = parentGetter.get(key); | ||
| final V parentValue = parentValueAndTimestamp == null ? null : parentValueAndTimestamp.value(); | ||
| final V1 resultValue = computeValue(key, parentValue); | ||
| return resultValue == null ? null : ValueAndTimestamp.make(resultValue, parentValueAndTimestamp.timestamp()); // this is a potential NPE -- if `parentValueAndTimestamp == null` we could also directly return `null`, but this would be a semantic change |
There was a problem hiding this comment.
Why can't we return -1 if the timestamp is null like we did in other operators?
There was a problem hiding this comment.
The comment was for the old code. Not sure if it still applies.
however, the concern was to return ValueAndTimestamp(null, -1) instead of null -- I think, the former would be incorrect.
| R newValue = null; | ||
| final V1 value1 = valueGetter1.get(key); | ||
| final V2 value2 = valueGetter2.get(key); | ||
| long resultTimestamp = -1; |
There was a problem hiding this comment.
nit: I'd suggest use a constant instead of hard-coded -1: we can reuse RecordQueue.UNKNOWN e.g.
| resultTimestamp = oldAggWithTimestamp.timestamp(); | ||
| } else { | ||
| oldAgg = null; | ||
| resultTimestamp = context().timestamp(); |
There was a problem hiding this comment.
In other places we use -1 whereas here we use context.timestamp. Could you elaborated a bit on the principles applied here?
There was a problem hiding this comment.
We use context.timestamp() because resultTimestamp = Math.max(resultTimestamp, context().timestamp()); with resultTimestamp==-1 is the exact some thing. Compare the lines below: if we init a new value, the result timestamp is the timestamp of the current input record (no need to use Max.max(-1, context().timestamp())` to compute it.
| * @return StoreBuilder | ||
| */ | ||
| public StoreBuilder<KeyValueStore<K, V>> materialize() { | ||
| KeyValueBytesStoreSupplier supplier = (KeyValueBytesStoreSupplier) materialized.storeSupplier(); |
There was a problem hiding this comment.
This looks a bit suspicious to me: users could also call Stores.persistentKeyValueWithTimestampStore and passed in the returned object o Materialized, in which case V is already a ValueAndTimestamp<V1> right?
There was a problem hiding this comment.
Yes. In fact, we expect the supplier to return a store that can handle ValueAndTimestamp.
However, we need the return type of the supplier to be a byte store -- and thus, we cannot distinguish between non-timestamp-byte-stores and timestamped-byte-store easily. However, we cover this case at an outer level and wrap a plain-key-value store with a proxy if a RecordConverter is not implemented (was is only an indicator)
In the end, because we want all most-inner stores to be plain-byte stores, we can't distinguish them -- however, if we exclude the upgrade of an existing store into a new store, both work work for both cases because both only take bytes, and only use the key for lookups -- basically both stores are agnostic is the value bytes are the one or the other.
More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.
Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.
Committer Checklist (excluded from commit message)