Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1269,8 +1269,14 @@ private <VO, VR> KStream<K, VR> doStreamTableJoin(final KTable<K, VO> table,
throw new IllegalArgumentException("KTable must be versioned to use a grace period in a stream table join.");
}
bufferStoreName = Optional.of(name + "-Buffer");
final RocksDBTimeOrderedKeyValueBuffer.Builder<Object, Object> storeBuilder =
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(bufferStoreName.get(), joined.gracePeriod(), name);
final RocksDBTimeOrderedKeyValueBuffer.Builder<K, V> storeBuilder =
new RocksDBTimeOrderedKeyValueBuffer.Builder<>(
bufferStoreName.get(),
joinedInternal.keySerde() != null ? joinedInternal.keySerde() : keySerde,
joinedInternal.valueSerde() != null ? joinedInternal.valueSerde() : valueSerde,
joined.gracePeriod(),
name
);
builder.addStateStore(new StoreBuilderWrapper(storeBuilder));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,23 @@ public class RocksDBTimeOrderedKeyValueBuffer<K, V> implements TimeOrderedKeyVal
public static class Builder<K, V> implements StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> {

private final String storeName;
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
private boolean loggingEnabled = true;
private Map<String, String> logConfig = new HashMap<>();
private final Duration grace;
private final String topic;

public Builder(final String storeName, final Duration grace, final String topic) {
public Builder(
final String storeName,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Duration grace,
final String topic
) {
this.storeName = storeName;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.grace = grace;
this.topic = topic;
}
Expand Down Expand Up @@ -116,6 +126,8 @@ public StoreBuilder<TimeOrderedKeyValueBuffer<K, V, V>> withLoggingDisabled() {
public TimeOrderedKeyValueBuffer<K, V, V> build() {
return new RocksDBTimeOrderedKeyValueBuffer<>(
new RocksDBTimeOrderedKeyValueBytesStoreSupplier(storeName).get(),
keySerde,
valueSerde,
grace,
topic,
loggingEnabled);
Expand All @@ -139,10 +151,14 @@ public String name() {


public RocksDBTimeOrderedKeyValueBuffer(final RocksDBTimeOrderedKeyValueBytesStore store,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Duration gracePeriod,
final String topic,
final boolean loggingEnabled) {
this.store = store;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
this.gracePeriod = gracePeriod.toMillis();
minTimestamp = store.minTimestamp();
minValid = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,17 @@ public static Collection<Object[]> data() {
@BeforeClass
public static void setupConfigsAndUtils() {
STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
}

void prepareEnvironment() throws InterruptedException {
void prepareEnvironment(final boolean setSerdes) throws InterruptedException {
if (setSerdes) {
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
} else {
STREAMS_CONFIG.remove(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG);
STREAMS_CONFIG.remove(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG);
}
if (!cacheEnabled) {
STREAMS_CONFIG.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
}
Expand Down Expand Up @@ -278,16 +283,13 @@ void runSelfJoinTestWithDriver(final List<List<TestRecord<Long, String>>> expect
private void checkQueryableStore(final String queryableName, final TestRecord<Long, String> expectedFinalResult, final TopologyTestDriver driver) {
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> store = driver.getTimestampedKeyValueStore(queryableName);

final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all();
final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();
try (final KeyValueIterator<Long, ValueAndTimestamp<String>> all = store.all()) {
final KeyValue<Long, ValueAndTimestamp<String>> onlyEntry = all.next();

try {
assertThat(onlyEntry.key, is(expectedFinalResult.key()));
assertThat(onlyEntry.value.value(), is(expectedFinalResult.value()));
assertThat(onlyEntry.value.timestamp(), is(expectedFinalResult.timestamp()));
assertThat(all.hasNext(), is(false));
} finally {
all.close();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public StreamStreamJoinIntegrationTest(final boolean cacheEnabled) {

@Before
public void prepareTopology() throws InterruptedException {
super.prepareEnvironment();
super.prepareEnvironment(true);

appID = "stream-stream-join-integration-test";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public StreamTableJoinIntegrationTest(final boolean cacheEnabled) {

@Before
public void prepareTopology() throws InterruptedException {
super.prepareEnvironment();
super.prepareEnvironment(true);

appID = "stream-table-join-integration-test";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.IntegrationTest;
Expand Down Expand Up @@ -60,20 +62,20 @@ public StreamTableJoinWithGraceIntegrationTest(final boolean cacheEnabled) {

@Before
public void prepareTopology() throws InterruptedException {
super.prepareEnvironment();
appID = "stream-table-join-integration-test";
builder = new StreamsBuilder();
joined = Joined.with(Serdes.Long(), Serdes.String(), Serdes.String(), "Grace", Duration.ofMillis(2));
}

@Test
public void testInnerWithVersionedStore() {
public void testInnerWithVersionedStore() throws Exception {
super.prepareEnvironment(false);
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");

leftStream = builder.stream(INPUT_TOPIC_LEFT);
rightTable = builder.table(INPUT_TOPIC_RIGHT, Materialized.as(
leftStream = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.Long(), Serdes.String()));
rightTable = builder.table(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.as(
Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5))));
leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC);
leftStream.join(rightTable, valueJoiner, joined).to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.String()));

final List<List<TestRecord<Long, String>>> expectedResult = Arrays.asList(
null,
Expand Down Expand Up @@ -105,7 +107,8 @@ public void testInnerWithVersionedStore() {
}

@Test
public void testLeftWithVersionedStore() {
public void testLeftWithVersionedStore() throws Exception {
super.prepareEnvironment(true);
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");

leftStream = builder.stream(INPUT_TOPIC_LEFT);
Expand Down Expand Up @@ -141,4 +144,4 @@ public void testLeftWithVersionedStore() {

runTestWithDriver(input, expectedResult);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public TableTableJoinIntegrationTest(final boolean cacheEnabled) {

@Before
public void prepareTopology() throws InterruptedException {
super.prepareEnvironment();
super.prepareEnvironment(true);

appID = "table-table-join-integration-test";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
Expand Down Expand Up @@ -57,18 +58,16 @@ public class RocksDBTimeOrderedKeyValueBufferTest {

@Before
public void setUp() {
when(serdeGetter.keySerde()).thenReturn(new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn(new Serdes.StringSerde());
final Metrics metrics = new Metrics();
offset = 0;
streamsMetrics = new StreamsMetricsImpl(metrics, "test-client", StreamsConfig.METRICS_LATEST, new MockTime());
context = new MockInternalNewProcessorContext<>(StreamsTestUtils.getStreamsConfig(), new TaskId(0, 0), TestUtils.tempDirectory());
}

private void createBuffer(final Duration grace) {
private void createBuffer(final Duration grace, final Serde<String> serde) {
final RocksDBTimeOrderedKeyValueBytesStore store = new RocksDBTimeOrderedKeyValueBytesStoreSupplier("testing").get();

buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, grace, "testing", false);
buffer = new RocksDBTimeOrderedKeyValueBuffer<>(store, serde, serde, grace, "testing", false);
buffer.setSerdesIfNull(serdeGetter);
buffer.init((StateStoreContext) context, store);
}
Expand All @@ -81,14 +80,16 @@ private boolean pipeRecord(final String key, final String value, final long time

@Test
public void shouldReturnIfRecordWasAdded() {
createBuffer(Duration.ofMillis(1));
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ofMillis(1), null);
assertThat(pipeRecord("K", "V", 2L), equalTo(true));
assertThat(pipeRecord("K", "V", 0L), equalTo(false));
}

@Test
public void shouldPutInBufferAndUpdateFields() {
createBuffer(Duration.ofMinutes(1));
createBuffer(Duration.ofMinutes(1), Serdes.String());
assertNumSizeAndTimestamp(buffer, 0, Long.MAX_VALUE, 0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
Expand All @@ -98,7 +99,7 @@ public void shouldPutInBufferAndUpdateFields() {

@Test
public void shouldAddAndEvictRecord() {
createBuffer(Duration.ZERO);
createBuffer(Duration.ZERO, Serdes.String());
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
Expand All @@ -109,7 +110,9 @@ public void shouldAddAndEvictRecord() {

@Test
public void shouldAddAndEvictRecordTwice() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
assertNumSizeAndTimestamp(buffer, 1, 0, 42);
Expand All @@ -125,7 +128,7 @@ public void shouldAddAndEvictRecordTwice() {

@Test
public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {
createBuffer(Duration.ofMillis(1));
createBuffer(Duration.ofMillis(1), Serdes.String());
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement());
Expand All @@ -139,7 +142,9 @@ public void shouldAddAndEvictRecordTwiceWithNonZeroGrace() {

@Test
public void shouldAddRecordsTwiceAndEvictRecordsOnce() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("1", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 1, r -> count.getAndIncrement());
Expand All @@ -151,7 +156,9 @@ public void shouldAddRecordsTwiceAndEvictRecordsOnce() {

@Test
public void shouldDropLateRecords() {
createBuffer(Duration.ZERO);
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ZERO, null);
pipeRecord("1", "0", 1L);
assertNumSizeAndTimestamp(buffer, 1, 1, 42);
pipeRecord("2", "0", 0L);
Expand All @@ -160,7 +167,7 @@ public void shouldDropLateRecords() {

@Test
public void shouldDropLateRecordsWithNonZeroGrace() {
createBuffer(Duration.ofMillis(1));
createBuffer(Duration.ofMillis(1), Serdes.String());
pipeRecord("1", "0", 2L);
assertNumSizeAndTimestamp(buffer, 1, 2, 42);
pipeRecord("2", "0", 1L);
Expand All @@ -171,7 +178,9 @@ public void shouldDropLateRecordsWithNonZeroGrace() {

@Test
public void shouldHandleCollidingKeys() {
createBuffer(Duration.ofMillis(1));
when(serdeGetter.keySerde()).thenReturn((Serde) new Serdes.StringSerde());
when(serdeGetter.valueSerde()).thenReturn((Serde) new Serdes.StringSerde());
createBuffer(Duration.ofMillis(1), null);
final AtomicInteger count = new AtomicInteger(0);
pipeRecord("2", "0", 0L);
buffer.evictWhile(() -> buffer.numRecords() > 0, r -> count.getAndIncrement());
Expand All @@ -196,4 +205,4 @@ private void assertNumSizeAndTimestamp(final TimeOrderedKeyValueBuffer<String, S
assertThat(buffer.minTimestamp(), equalTo(time));
assertThat(buffer.bufferSize(), equalTo(size));
}
}
}