Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
Expand Down Expand Up @@ -775,7 +773,7 @@ public void close() { }
return streams;
}

private void writeInputData(final List<KeyValue<Long, Long>> records) throws Exception {
private void writeInputData(final List<KeyValue<Long, Long>> records) {
IntegrationTestUtils.produceKeyValuesSynchronously(
MULTI_PARTITION_INPUT_TOPIC,
records,
Expand Down Expand Up @@ -849,21 +847,9 @@ private Set<KeyValue<Long, Long>> getMaxPerKey(final List<KeyValue<Long, Long>>
}

private void verifyStateStore(final KafkaStreams streams,
final Set<KeyValue<Long, Long>> expectedStoreContent) {
ReadOnlyKeyValueStore<Long, Long> store = null;

final long maxWaitingTime = System.currentTimeMillis() + 300000L;
while (System.currentTimeMillis() < maxWaitingTime) {
try {
store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()));
break;
} catch (final InvalidStateStoreException okJustRetry) {
try {
Thread.sleep(5000L);
} catch (final Exception ignore) { }
}
}

final Set<KeyValue<Long, Long>> expectedStoreContent) throws InterruptedException {
final ReadOnlyKeyValueStore<Long, Long> store = IntegrationTestUtils
.getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);

final KeyValueIterator<Long, Long> it = store.all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand Down Expand Up @@ -60,6 +58,8 @@
import java.util.Map;
import java.util.Properties;

import static org.junit.Assert.assertNotNull;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
public class GlobalKTableEOSIntegrationTest {
Expand Down Expand Up @@ -113,11 +113,13 @@ public void before() throws Exception {
.replace(']', '_');
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300);
globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()),
Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
.withKeySerde(Serdes.Long())
Expand All @@ -128,7 +130,7 @@ public void before() throws Exception {
}

@After
public void whenShuttingDown() throws Exception {
public void after() throws Exception {
if (kafkaStreams != null) {
kafkaStreams.close();
}
Expand Down Expand Up @@ -158,8 +160,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {

produceGlobalTableValues();

final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(replicatedStore);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we have to check for null now?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since previously we would just throw the exception with the un-wrapped call, here asserting it is not null is equal to make sure that the store is indeed returned.


TestUtils.waitForCondition(
() -> "J".equals(replicatedStore.get(5L)),
Expand Down Expand Up @@ -202,8 +205,9 @@ public void shouldKStreamGlobalKTableJoin() throws Exception {

produceGlobalTableValues();

final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(replicatedStore);

TestUtils.waitForCondition(
() -> "J".equals(replicatedStore.get(5L)),
Expand Down Expand Up @@ -236,14 +240,12 @@ public void shouldRestoreTransactionalMessages() throws Exception {
expected.put(3L, "C");
expected.put(4L, "D");

final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);

TestUtils.waitForCondition(
() -> {
final ReadOnlyKeyValueStore<Long, String> store;
try {
store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
} catch (final InvalidStateStoreException ex) {
return false;
}
final Map<Long, String> result = new HashMap<>();
final Iterator<KeyValue<Long, String>> it = store.all();
while (it.hasNext()) {
Expand All @@ -270,14 +272,12 @@ public void shouldNotRestoreAbortedMessages() throws Exception {
expected.put(3L, "C");
expected.put(4L, "D");

final ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);

TestUtils.waitForCondition(
() -> {
final ReadOnlyKeyValueStore<Long, String> store;
try {
store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
} catch (final InvalidStateStoreException ex) {
return false;
}
final Map<Long, String> result = new HashMap<>();
final Iterator<KeyValue<Long, String>> it = store.all();
while (it.hasNext()) {
Expand All @@ -296,6 +296,7 @@ private void createTopics() throws Exception {
.replace(']', '_');
streamTopic = "stream-" + suffix;
globalTableTopic = "globalTable-" + suffix;
CLUSTER.deleteAllTopicsAndWait(300_000L);
CLUSTER.createTopics(streamTopic);
CLUSTER.createTopic(globalTableTopic, 2, 1);
}
Expand All @@ -305,7 +306,7 @@ private void startStreams() {
kafkaStreams.start();
}

private void produceTopicValues(final String topic) throws Exception {
private void produceTopicValues(final String topic) {
IntegrationTestUtils.produceKeyValuesSynchronously(
topic,
Arrays.asList(
Expand Down Expand Up @@ -341,7 +342,7 @@ private void produceAbortedMessages() throws Exception {
mockTime.milliseconds());
}

private void produceInitialGlobalTableValues() throws Exception {
private void produceInitialGlobalTableValues() {
final Properties properties = new Properties();
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid");
properties.put(ProducerConfig.RETRIES_CONFIG, 1);
Expand All @@ -362,7 +363,7 @@ private void produceInitialGlobalTableValues() throws Exception {
true);
}

private void produceGlobalTableValues() throws Exception {
private void produceGlobalTableValues() {
IntegrationTestUtils.produceKeyValuesSynchronously(
globalTableTopic,
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand Down Expand Up @@ -58,6 +57,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.junit.Assert.assertNotNull;

@Category({IntegrationTest.class})
public class GlobalKTableIntegrationTest {
Expand Down Expand Up @@ -144,16 +144,18 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception {
firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();

final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(replicatedStore);

TestUtils.waitForCondition(
() -> "J".equals(replicatedStore.get(5L)),
30000,
"waiting for data in replicated store");

final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
assertNotNull(replicatedStoreWithTimestamp);
assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));

firstTimestamp = mockTime.milliseconds();
Expand Down Expand Up @@ -211,16 +213,18 @@ public void shouldKStreamGlobalKTableJoin() throws Exception {
firstTimestamp = mockTime.milliseconds();
produceGlobalTableValues();

final ReadOnlyKeyValueStore<Long, String> replicatedStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Long, String> replicatedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(replicatedStore);

TestUtils.waitForCondition(
() -> "J".equals(replicatedStore.get(5L)),
30000,
"waiting for data in replicated store");

final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
final ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> replicatedStoreWithTimestamp = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
assertNotNull(replicatedStoreWithTimestamp);
assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L)));

firstTimestamp = mockTime.milliseconds();
Expand Down Expand Up @@ -257,17 +261,23 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
produceInitialGlobalTableValues();

startStreams();
ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
ReadOnlyKeyValueStore<Long, String> store = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertNotNull(store);

assertThat(store.approximateNumEntries(), equalTo(4L));
ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));

ReadOnlyKeyValueStore<Long, ValueAndTimestamp<String>> timestampedStore = IntegrationTestUtils
.getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
assertNotNull(timestampedStore);

assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
kafkaStreams.close();

startStreams();
store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
store = IntegrationTestUtils.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore());
assertThat(store.approximateNumEntries(), equalTo(4L));
timestampedStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
timestampedStore = IntegrationTestUtils.getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Aggregator;
Expand Down Expand Up @@ -677,7 +676,8 @@ public void close() {}

// verify can query data via IQ
final ReadOnlySessionStore<String, String> sessionStore =
kafkaStreams.store(StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore()));
IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, QueryableStoreTypes.sessionStore());

final KeyValueIterator<Windowed<String>, String> bob = sessionStore.fetch("bob");
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start")));
assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand Down Expand Up @@ -119,11 +118,8 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the first batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));

final ReadOnlyKeyValueStore<Integer, Integer> store1 = kafkaStreams1
.store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));

final ReadOnlyKeyValueStore<Integer, Integer> store2 = kafkaStreams2
.store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()));
final ReadOnlyKeyValueStore<Integer, Integer> store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore());
final ReadOnlyKeyValueStore<Integer, Integer> store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore());

final boolean kafkaStreams1WasFirstActive;
final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0);
Expand Down Expand Up @@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception {
// Assert that all messages in the second batch were processed in a timely manner
assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true)));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OptimizedKTableIntegrationTest.shouldApplyUpdatesToStandbyStore still failed on one of the builds at this line :/
But, at least we got farther into the test before it failed so I'd say this is still an improvement 😄


// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a minor fix, that we should retry this condition.

// Assert that the current value in store reflects all messages being processed
assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1)));
});
}

private void produceValueRange(final int key, final int start, final int endExclusive) throws Exception {
Expand Down Expand Up @@ -227,10 +225,11 @@ private Properties streamsConfiguration() {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fix to the test itself: with caching the records are delayed sending to the sink topics.

config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
return config;
}
}
Loading