diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java index b462895209007..67bf5734590e1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java @@ -141,7 +141,8 @@ public void process(final Record> record) { throw new StreamsException("Record key for the grouping KTable should not be null."); } - if (useVersionedSemantics && !record.value().isLatest) { + final boolean isLatest = record.value().isLatest; + if (useVersionedSemantics && !isLatest) { // skip out-of-order records when aggregating a versioned table, since the // aggregate should include latest-by-timestamp records only. as an optimization, // do not forward the out-of-order record downstream to the repartition topic either. @@ -154,8 +155,6 @@ public void process(final Record> record) { final KeyValue oldPair = record.value().oldValue == null ? null : mapper.apply(record.key(), record.value().oldValue); - final boolean isLatest = record.value().isLatest; - // if the selected repartition key or value is null, skip // forward oldPair first, to be consistent with reduce and aggregate final boolean oldPairNotNull = oldPair != null && oldPair.key != null && oldPair.value != null; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java index ee731b216a120..c8b3163fb3e8f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java @@ -118,16 +118,16 @@ public void init(final ProcessorContext> context) { public void process(final Record> record) { final VOut newValue = valueTransformer.transform(record.key(), record.value().newValue); - if (queryableName == null) { - final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null; - context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest))); - } else { + if (queryableName != null) { final VOut oldValue = sendOldValues ? getValueOrNull(store.get(record.key())) : null; final long putReturnCode = store.put(record.key(), newValue, record.timestamp()); // if not put to store, do not forward downstream either if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { tupleForwarder.maybeForward(record.withValue(new Change<>(newValue, oldValue, putReturnCode == PUT_RETURN_CODE_IS_LATEST))); } + } else { + final VOut oldValue = sendOldValues ? valueTransformer.transform(record.key(), record.value().oldValue) : null; + context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest))); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java index 3af8d901d849e..0d56b947dbbcc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/KeyValueStore.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.state; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import java.util.List; @@ -36,6 +37,7 @@ public interface KeyValueStore extends StateStore, ReadOnlyKeyValueStore extends StateStore, ReadOnlyKeyValueStore extends StateStore, ReadOnlyKeyValueStore> entries); @@ -65,6 +69,7 @@ public interface KeyValueStore extends StateStore, ReadOnlyKeyValueStore reverseAll() { * @param Prefix Serializer type * @param

Prefix Type. * @return The iterator for keys having the specified prefix. + * @throws InvalidStateStoreException if the store is not initialized */ default , P> KeyValueIterator prefixScan(P prefix, PS prefixKeySerializer) { throw new UnsupportedOperationException(); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java index e8f7ee249c8f0..40faaf003d3fe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/VersionedKeyValueStore.java @@ -60,6 +60,7 @@ public interface VersionedKeyValueStore extends StateStore { * validTo timestamp is undefined. {@code Long.MIN_VALUE} indicates that the record * was not put, due to grace period having been exceeded. * @throws NullPointerException If {@code null} is used for key. + * @throws InvalidStateStoreException if the store is not initialized */ long put(K key, V value, long timestamp); @@ -96,6 +97,7 @@ public interface VersionedKeyValueStore extends StateStore { * returned {@link VersionedRecord} may be smaller than the provided deletion * timestamp. * @throws NullPointerException If {@code null} is used for key. + * @throws InvalidStateStoreException if the store is not initialized */ VersionedRecord delete(K key, long timestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java index 86c82fa55bf2b..d01d08dcb39f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java @@ -44,6 +44,7 @@ public interface WindowStore extends StateStore, ReadOnlyWindowStore * @param key The key to associate the value to * @param value The value; can be null * @param windowStartTimestamp The timestamp of the beginning of the window to put the key/value into + * @throws InvalidStateStoreException if the store is not initialized * @throws NullPointerException if the given key is {@code null} */ void put(K key, V value, long windowStartTimestamp); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java index c27fe4925fc99..8260f1a0bff4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java @@ -86,8 +86,8 @@ private class MeteredVersionedKeyValueStoreInternal extends MeteredKeyValueStore> { private final VersionedBytesStore inner; - private final Serde rawValueSerde; - private StateSerdes rawValueSerdes; + private final Serde plainValueSerde; + private StateSerdes plainValueSerdes; MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, final String metricScope, @@ -104,13 +104,13 @@ private class MeteredVersionedKeyValueStoreInternal : new ValueAndTimestampSerde<>(valueSerde) ); this.inner = inner; - this.rawValueSerde = valueSerde; + this.plainValueSerde = valueSerde; } public long put(final K key, final V value, final long timestamp) { Objects.requireNonNull(key, "key cannot be null"); try { - final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), rawValueSerdes.rawValue(value), timestamp), time, putSensor); + final long validTo = maybeMeasureLatency(() -> inner.put(keyBytes(key), plainValueSerdes.rawValue(value), timestamp), time, putSensor); maybeRecordE2ELatency(); return validTo; } catch (final ProcessorStateException e) { @@ -178,10 +178,10 @@ protected void initStoreSerde(final ProcessorContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - rawValueSerdes = new StateSerdes<>( + plainValueSerdes = new StateSerdes<>( changelogTopic, prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(rawValueSerde, new SerdeGetter(context)) + prepareValueSerde(plainValueSerde, new SerdeGetter(context)) ); } @@ -192,10 +192,10 @@ protected void initStoreSerde(final StateStoreContext context) { // additionally init raw value serde final String storeName = super.name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); - rawValueSerdes = new StateSerdes<>( + plainValueSerdes = new StateSerdes<>( changelogTopic, prepareKeySerde(keySerde, new SerdeGetter(context)), - prepareValueSerde(rawValueSerde, new SerdeGetter(context)) + prepareValueSerde(plainValueSerde, new SerdeGetter(context)) ); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java index ff9cf2cecc91f..e33988b971c69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java @@ -23,11 +23,13 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; +import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -116,6 +118,8 @@ public class RocksDBVersionedStore implements VersionedKeyValueStore delete(final Bytes key, final long timestamp) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + if (timestamp < observedStreamTime - gracePeriod) { expiredRecordSensor.record(1.0d, context.currentSystemTimeMs()); LOG.warn("Skipping record for expired delete."); @@ -157,6 +164,9 @@ public VersionedRecord delete(final Bytes key, final long timestamp) { @Override public VersionedRecord get(final Bytes key) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); + // latest value (if present) is guaranteed to be in the latest value store final byte[] rawLatestValueAndTimestamp = latestValueStore.get(key); if (rawLatestValueAndTimestamp != null) { @@ -171,6 +181,8 @@ public VersionedRecord get(final Bytes key) { @Override public VersionedRecord get(final Bytes key, final long asOfTimestamp) { + Objects.requireNonNull(key, "key cannot be null"); + validateStoreOpen(); if (asOfTimestamp < observedStreamTime - historyRetention) { // history retention exceeded. we still check the latest value store in case the @@ -373,6 +385,12 @@ void restoreBatch(final Collection> records) { } } + private void validateStoreOpen() { + if (!open) { + throw new InvalidStateStoreException("Store " + name + " is currently closed"); + } + } + /** * Generic interface for segment stores. See {@link VersionedStoreClient} for use. */ @@ -847,7 +865,6 @@ private long finishPut( segment.put(key, segmentValue.serialize()); } } - return foundTs; } else { // insert into segment corresponding to foundTs, as foundTs represents the validTo // timestamp of the current put. @@ -891,8 +908,8 @@ private long finishPut( segment.put(key, segmentValue.serialize()); } } - return foundTs; } + return foundTs; } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java index 31bcc9e0e09da..1d17cbb53a289 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java @@ -66,7 +66,7 @@ public void prepareTopology() throws InterruptedException { private final TestRecord expectedFinalJoinResultUnversioned = new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L); private final TestRecord expectedFinalJoinResultLeftVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L); private final TestRecord expectedFinalJoinResultRightVersionedOnly = new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L); - private final TestRecord expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 4L); + private final TestRecord expectedFinalMultiJoinResult = new TestRecord<>(ANY_UNIQUE_KEY, "F-f-f", null, 4L); private final String storeName = appID + "-store"; private final Materialized> materialized = Materialized.>as(storeName) @@ -90,26 +90,26 @@ public void testInner() throws Exception { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) ); runTestWithDriver(input, expectedResult, storeName); @@ -130,27 +130,27 @@ public void testLeft() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) ); runTestWithDriver(input, expectedResult, storeName); @@ -171,27 +171,27 @@ public void testOuter() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d", null, 7L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 3L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-null", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-f", null, 4L)) ); runTestWithDriver(input, expectedResult, storeName); @@ -215,14 +215,14 @@ public void testInnerWithVersionedStores() { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, @@ -230,7 +230,7 @@ public void testInnerWithVersionedStores() { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), null, null, null, @@ -256,23 +256,23 @@ public void testLeftWithVersionedStores() { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, null, null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), null, null, null, @@ -298,23 +298,23 @@ public void testOuterWithVersionedStores() { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, null, null, null, Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), null, null, null, @@ -341,14 +341,14 @@ public void testInnerWithLeftVersionedOnly() throws Exception { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, @@ -356,11 +356,11 @@ public void testInnerWithLeftVersionedOnly() throws Exception { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 15L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L)) ); runTestWithDriver(input, expectedResult, storeName); @@ -383,27 +383,27 @@ public void testLeftWithLeftVersionedOnly() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, null, null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-null", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-null", null, 15L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-f", null, 15L)) ); runTestWithDriver(input, expectedResult, storeName); @@ -470,14 +470,14 @@ public void testInnerWithRightVersionedOnly() throws Exception { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, @@ -485,10 +485,10 @@ public void testInnerWithRightVersionedOnly() throws Exception { null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), null ); @@ -512,26 +512,26 @@ public void testLeftWithRightVersionedOnly() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 2L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 2L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 14L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), null ); @@ -555,26 +555,26 @@ public void testOuterWithRightVersionedOnly() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a", null, 5L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 2L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-null", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 2L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "E-e", null, 15L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-e", null, 14L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "F-e", null, 14L)), null ); @@ -603,24 +603,24 @@ public void testInnerInner() throws Exception { null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), - null, // correct would be -> new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L) + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + null, // correct would be -> new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L) // we don't get correct value, because of self-join of `rightTable` null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -646,23 +646,23 @@ public void testInnerLeft() throws Exception { null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -688,26 +688,26 @@ public void testInnerOuter() throws Exception { null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L), - new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L), + new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, // incorrect result `null-d` is caused by self-join of `rightTable` Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -733,23 +733,23 @@ public void testLeftInner() throws Exception { null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -773,27 +773,27 @@ public void testLeftLeft() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 7L)), null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -817,27 +817,27 @@ public void testLeftOuter() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -863,25 +863,25 @@ public void testOuterInner() throws Exception { null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b-b", null, 7L)), null, null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 11L)), null, null, Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -905,29 +905,29 @@ public void testOuterLeft() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b-b", null, 7L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, Arrays.asList( new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); @@ -951,31 +951,31 @@ public void testOuterOuter() throws Exception { final List>> expectedResult = Arrays.asList( null, null, - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "A-null-null", null, 3L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), - new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L), + new TestRecord<>(ANY_UNIQUE_KEY, "A-a-a", null, 4L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "B-a-a", null, 5L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), - new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b-b", null, 7L)), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L), + new TestRecord<>(ANY_UNIQUE_KEY, "B-b-b", null, 6L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "null-b-b", null, 7L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L), - new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), + new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L), + new TestRecord<>(ANY_UNIQUE_KEY, null, null, 8L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 9L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-c-c", null, 10L)), Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), - new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L), + new TestRecord<>(ANY_UNIQUE_KEY, "C-null-null", null, 11L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, null, null, 12L)), null, Arrays.asList( - new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), - new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), - Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) + new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L), + new TestRecord<>(ANY_UNIQUE_KEY, "null-d-d", null, 14L)), + Collections.singletonList(new TestRecord<>(ANY_UNIQUE_KEY, "D-d-d", null, 15L)) ); runTestWithDriver(inputWithoutOutOfOrderData, expectedResult, storeName); }