From f936848a86992d779fa37703c4a4a7b83fc30727 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 27 Apr 2020 21:49:05 -0700 Subject: [PATCH 1/2] wrap around getting stores --- .../integration/EosIntegrationTest.java | 22 +-- .../GlobalKTableEOSIntegrationTest.java | 34 ++-- .../GlobalKTableIntegrationTest.java | 38 ++-- .../KStreamAggregationIntegrationTest.java | 4 +- .../OptimizedKTableIntegrationTest.java | 17 +- .../QueryableStateIntegrationTest.java | 80 +++----- .../StoreQueryIntegrationTest.java | 70 +++---- .../StoreUpgradeIntegrationTest.java | 72 +++++-- .../utils/IntegrationTestUtils.java | 180 +++++++++--------- 9 files changed, 253 insertions(+), 264 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 747ac40cc3fce..1deb7c4a05a5b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -31,10 +31,8 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; @@ -736,7 +734,7 @@ public void close() { } return streams; } - private void writeInputData(final List> records) throws Exception { + private void writeInputData(final List> records) { IntegrationTestUtils.produceKeyValuesSynchronously( MULTI_PARTITION_INPUT_TOPIC, records, @@ -810,21 +808,9 @@ private Set> getMaxPerKey(final List> } private void verifyStateStore(final KafkaStreams streams, - final Set> expectedStoreContent) { - ReadOnlyKeyValueStore store = null; - - final long maxWaitingTime = System.currentTimeMillis() + 300000L; - while (System.currentTimeMillis() < maxWaitingTime) { - try { - store = streams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); - break; - } catch (final InvalidStateStoreException okJustRetry) { - try { - Thread.sleep(5000L); - } catch (final Exception ignore) { } - } - } - + final Set> expectedStoreContent) throws InterruptedException { + final ReadOnlyKeyValueStore store = IntegrationTestUtils + .getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); final KeyValueIterator it = store.all(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 7f926e09ab34e..18ed52718aa5c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -27,8 +27,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StoreQueryParameters; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -60,6 +58,8 @@ import java.util.Map; import java.util.Properties; +import static org.junit.Assert.assertNotNull; + @RunWith(Parameterized.class) @Category({IntegrationTest.class}) public class GlobalKTableEOSIntegrationTest { @@ -158,8 +158,9 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(replicatedStore); TestUtils.waitForCondition( () -> "J".equals(replicatedStore.get(5L)), @@ -202,8 +203,9 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(replicatedStore); TestUtils.waitForCondition( () -> "J".equals(replicatedStore.get(5L)), @@ -236,14 +238,12 @@ public void shouldRestoreTransactionalMessages() throws Exception { expected.put(3L, "C"); expected.put(4L, "D"); + final ReadOnlyKeyValueStore store = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(store); + TestUtils.waitForCondition( () -> { - final ReadOnlyKeyValueStore store; - try { - store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); - } catch (final InvalidStateStoreException ex) { - return false; - } final Map result = new HashMap<>(); final Iterator> it = store.all(); while (it.hasNext()) { @@ -270,14 +270,12 @@ public void shouldNotRestoreAbortedMessages() throws Exception { expected.put(3L, "C"); expected.put(4L, "D"); + final ReadOnlyKeyValueStore store = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(store); + TestUtils.waitForCondition( () -> { - final ReadOnlyKeyValueStore store; - try { - store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); - } catch (final InvalidStateStoreException ex) { - return false; - } final Map result = new HashMap<>(); final Iterator> it = store.all(); while (it.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java index 628d913747831..eda90e0c0dc5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java @@ -26,7 +26,6 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -58,6 +57,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertNotNull; @Category({IntegrationTest.class}) public class GlobalKTableIntegrationTest { @@ -144,16 +144,18 @@ public void shouldKStreamGlobalKTableLeftJoin() throws Exception { firstTimestamp = mockTime.milliseconds(); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(replicatedStore); TestUtils.waitForCondition( () -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); - final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + assertNotNull(replicatedStoreWithTimestamp); assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L))); firstTimestamp = mockTime.milliseconds(); @@ -211,16 +213,18 @@ public void shouldKStreamGlobalKTableJoin() throws Exception { firstTimestamp = mockTime.milliseconds(); produceGlobalTableValues(); - final ReadOnlyKeyValueStore replicatedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore replicatedStore = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(replicatedStore); TestUtils.waitForCondition( () -> "J".equals(replicatedStore.get(5L)), 30000, "waiting for data in replicated store"); - final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> replicatedStoreWithTimestamp = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + assertNotNull(replicatedStoreWithTimestamp); assertThat(replicatedStoreWithTimestamp.get(5L), equalTo(ValueAndTimestamp.make("J", firstTimestamp + 4L))); firstTimestamp = mockTime.milliseconds(); @@ -257,17 +261,23 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception { produceInitialGlobalTableValues(); startStreams(); - ReadOnlyKeyValueStore store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + ReadOnlyKeyValueStore store = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); + assertNotNull(store); + assertThat(store.approximateNumEntries(), equalTo(4L)); - ReadOnlyKeyValueStore> timestampedStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore())); + + ReadOnlyKeyValueStore> timestampedStore = IntegrationTestUtils + .getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + assertNotNull(timestampedStore); + assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); kafkaStreams.close(); startStreams(); - store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore())); + store = IntegrationTestUtils.getStore(globalStore, kafkaStreams, QueryableStoreTypes.keyValueStore()); assertThat(store.approximateNumEntries(), equalTo(4L)); - timestampedStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore())); + timestampedStore = IntegrationTestUtils.getStore(globalStore, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); assertThat(timestampedStore.approximateNumEntries(), equalTo(4L)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 3032037926717..1d5ab8e9985a2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KeyValueTimestamp; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Aggregator; @@ -677,7 +676,8 @@ public void close() {} // verify can query data via IQ final ReadOnlySessionStore sessionStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(userSessionsStore, QueryableStoreTypes.sessionStore())); + IntegrationTestUtils.getStore(userSessionsStore, kafkaStreams, QueryableStoreTypes.sessionStore()); + final KeyValueIterator, String> bob = sessionStore.fetch("bob"); assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t1, t1)), "start"))); assertThat(bob.next(), equalTo(KeyValue.pair(new Windowed<>("bob", new SessionWindow(t3, t4)), "pause:resume"))); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java index 32b4b0a702a92..d342631065315 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/OptimizedKTableIntegrationTest.java @@ -43,7 +43,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KeyQueryMetadata; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; @@ -119,11 +118,8 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - final ReadOnlyKeyValueStore store1 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())); - - final ReadOnlyKeyValueStore store2 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, QueryableStoreTypes.keyValueStore()); + final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, QueryableStoreTypes.keyValueStore()); final boolean kafkaStreams1WasFirstActive; final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); @@ -163,8 +159,10 @@ public void shouldApplyUpdatesToStandbyStore() throws Exception { // Assert that all messages in the second batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch2NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); - // Assert that the current value in store reflects all messages being processed - assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1))); + TestUtils.retryOnExceptionWithTimeout(100, 60 * 1000, () -> { + // Assert that the current value in store reflects all messages being processed + assertThat(newActiveStore.get(key), is(equalTo(totalNumMessages - 1))); + }); } private void produceValueRange(final int key, final int start, final int endExclusive) throws Exception { @@ -227,10 +225,11 @@ private Properties streamsConfiguration() { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); + config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); - config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); return config; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 6592c3869b77c..b3929f6a1e161 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -35,7 +35,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KafkaStreams.State; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.LagInfo; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; @@ -49,7 +48,6 @@ import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StreamsMetadata; @@ -60,7 +58,6 @@ import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.NoRetryException; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -305,9 +302,8 @@ private void verifyAllKVKeys(final List streamsList, final int index = queryMetadata.getActiveHost().port(); final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); final ReadOnlyKeyValueStore store = - streamsWithKey.store(StoreQueryParameters.fromNameAndType(storeName, queryableStoreType).enableStaleStores()); + IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.keyValueStore()); if (store == null) { nullStoreKeys.add(key); continue; @@ -365,9 +361,8 @@ private void verifyAllWindowedKeys(final List streamsList, final int index = queryMetadata.getActiveHost().port(); final KafkaStreams streamsWithKey = pickInstanceByPort ? streamsList.get(index) : streams; - final QueryableStoreType> queryableWindowStoreType = QueryableStoreTypes.windowStore(); final ReadOnlyWindowStore store = - streamsWithKey.store(StoreQueryParameters.fromNameAndType(storeName, queryableWindowStoreType).enableStaleStores()); + IntegrationTestUtils.getStore(storeName, streamsWithKey, true, QueryableStoreTypes.windowStore()); if (store == null) { nullStoreKeys.add(key); continue; @@ -649,10 +644,10 @@ public void shouldAllowConcurrentAccesses() throws Exception { waitUntilAtLeastNumRecordProcessed(outputTopicConcurrent, 1); final ReadOnlyKeyValueStore keyValueStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName + "-" + streamConcurrent, QueryableStoreTypes.keyValueStore())); + IntegrationTestUtils.getStore(storeName + "-" + streamConcurrent, kafkaStreams, QueryableStoreTypes.keyValueStore()); final ReadOnlyWindowStore windowStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(windowStoreName + "-" + streamConcurrent, QueryableStoreTypes.windowStore())); + IntegrationTestUtils.getStore(windowStoreName + "-" + streamConcurrent, kafkaStreams, QueryableStoreTypes.windowStore()); final Map expectedWindowState = new HashMap<>(); final Map expectedCount = new HashMap<>(); @@ -714,10 +709,11 @@ public void shouldBeAbleToQueryFilterState() throws Exception { waitUntilAtLeastNumRecordProcessed(outputTopic, 1); - final ReadOnlyKeyValueStore - myFilterStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryFilter", QueryableStoreTypes.keyValueStore())); - final ReadOnlyKeyValueStore - myFilterNotStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryFilterNot", QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore myFilterStore = + IntegrationTestUtils.getStore("queryFilter", kafkaStreams, QueryableStoreTypes.keyValueStore()); + + final ReadOnlyKeyValueStore myFilterNotStore = + IntegrationTestUtils.getStore("queryFilterNot", kafkaStreams, QueryableStoreTypes.keyValueStore()); for (final KeyValue expectedEntry : expectedBatch1) { TestUtils.waitForCondition(() -> expectedEntry.value.equals(myFilterStore.get(expectedEntry.key)), @@ -781,7 +777,8 @@ public void shouldBeAbleToQueryMapValuesState() throws Exception { waitUntilAtLeastNumRecordProcessed(outputTopic, 5); final ReadOnlyKeyValueStore myMapStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryMapValues", QueryableStoreTypes.keyValueStore())); + IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore()); + for (final KeyValue batchEntry : batch1) { assertEquals(Long.valueOf(batchEntry.value), myMapStore.get(batchEntry.key)); } @@ -828,9 +825,9 @@ public void shouldBeAbleToQueryMapValuesAfterFilterState() throws Exception { waitUntilAtLeastNumRecordProcessed(outputTopic, 1); - final ReadOnlyKeyValueStore - myMapStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType("queryMapValues", - QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore myMapStore = + IntegrationTestUtils.getStore("queryMapValues", kafkaStreams, QueryableStoreTypes.keyValueStore()); + for (final KeyValue expectedEntry : expectedBatch1) { assertEquals(expectedEntry.value, myMapStore.get(expectedEntry.key)); } @@ -889,11 +886,12 @@ private void verifyCanQueryState(final int cacheSizeBytes) throws Exception { waitUntilAtLeastNumRecordProcessed(outputTopic, 1); - final ReadOnlyKeyValueStore - myCount = kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore myCount = + IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()); final ReadOnlyWindowStore windowStore = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(windowStoreName, QueryableStoreTypes.windowStore())); + IntegrationTestUtils.getStore(windowStoreName, kafkaStreams, QueryableStoreTypes.windowStore()); + verifyCanGetByKey(keys, expectedCount, expectedCount, @@ -926,13 +924,8 @@ public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exceptio final int maxWaitMs = 30000; - TestUtils.waitForCondition( - new WaitForStore(storeName), - maxWaitMs, - "waiting for store " + storeName); - final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); + IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()); TestUtils.waitForCondition( () -> Long.valueOf(8).equals(store.get("hello")), @@ -950,9 +943,7 @@ public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exceptio TestUtils.waitForCondition( () -> { try { - assertEquals( - Long.valueOf(8L), - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())).get("hello")); + assertEquals(8L, IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()).get("hello")); return true; } catch (final InvalidStateStoreException ise) { return false; @@ -963,24 +954,6 @@ public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exceptio } - private class WaitForStore implements TestCondition { - private final String storeName; - - WaitForStore(final String storeName) { - this.storeName = storeName; - } - - @Override - public boolean conditionMet() { - try { - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); - return true; - } catch (final InvalidStateStoreException ise) { - return false; - } - } - } - @Test public void shouldAllowToQueryAfterThreadDied() throws Exception { final AtomicBoolean beforeFailure = new AtomicBoolean(true); @@ -1025,13 +998,8 @@ public void shouldAllowToQueryAfterThreadDied() throws Exception { final int maxWaitMs = 30000; - TestUtils.waitForCondition( - new WaitForStore(storeName), - maxWaitMs, - "waiting for store " + storeName); - final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); + IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()); TestUtils.waitForCondition( () -> "12".equals(store.get("a")) && "34".equals(store.get("b")), @@ -1052,13 +1020,9 @@ public void shouldAllowToQueryAfterThreadDied() throws Exception { failed::get, maxWaitMs, "wait for thread to fail"); - TestUtils.waitForCondition( - new WaitForStore(storeName), - maxWaitMs, - "waiting for store " + storeName); final ReadOnlyKeyValueStore store2 = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore())); + IntegrationTestUtils.getStore(storeName, kafkaStreams, QueryableStoreTypes.keyValueStore()); try { TestUtils.waitForCondition( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index 0bc465ad1cd70..68195b536e32b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -119,18 +119,10 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception { final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); - final ReadOnlyKeyValueStore store1 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType)); + final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); + final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); - final ReadOnlyKeyValueStore store2 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType)); - - final boolean kafkaStreams1IsActive; - if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) { - kafkaStreams1IsActive = true; - } else { - kafkaStreams1IsActive = false; - } + final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1; // Assert that only active is able to query for a key by default assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); @@ -167,22 +159,17 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { //key doesn't belongs to this partition final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; - final boolean kafkaStreams1IsActive; - if ((keyQueryMetadata.getActiveHost().port() % 2) == 1) { - kafkaStreams1IsActive = true; - } else { - kafkaStreams1IsActive = false; - } + final boolean kafkaStreams1IsActive = (keyQueryMetadata.getActiveHost().port() % 2) == 1; - final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); + StoreQueryParameters> storeQueryParam = + StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) + .withPartition(keyPartition); ReadOnlyKeyValueStore store1 = null; ReadOnlyKeyValueStore store2 = null; if (kafkaStreams1IsActive) { - store1 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition)); + store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); } else { - store2 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyPartition)); + store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); } if (kafkaStreams1IsActive) { @@ -196,14 +183,14 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { // Assert that only active for a specific requested partition serves key if stale stores and not enabled assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + storeQueryParam = StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) + .withPartition(keyDontBelongPartition); ReadOnlyKeyValueStore store3 = null; ReadOnlyKeyValueStore store4 = null; if (!kafkaStreams1IsActive) { - store3 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition)); + store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); } else { - store4 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).withPartition(keyDontBelongPartition)); + store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); } // Assert that key is not served when wrong specific partition is requested @@ -240,13 +227,13 @@ public void shouldQueryAllStalePartitionStores() throws Exception { // Assert that both active and standby are able to query for a key TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores()); + final ReadOnlyKeyValueStore store1 = IntegrationTestUtils + .getStore(TABLE_NAME, kafkaStreams1, true, queryableStoreType); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores()); + final ReadOnlyKeyValueStore store2 = IntegrationTestUtils + .getStore(TABLE_NAME, kafkaStreams2, true, queryableStoreType); return store2.get(key) != null; }, "store2 cannot find results for key"); } @@ -284,22 +271,25 @@ public void shouldQuerySpecificStalePartitionStores() throws Exception { final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); // Assert that both active and standby are able to query for a key + final StoreQueryParameters> param = StoreQueryParameters + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyPartition); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store1 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition)); + final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(kafkaStreams1, param); return store1.get(key) != null; }, "store1 cannot find results for key"); TestUtils.waitForCondition(() -> { - final ReadOnlyKeyValueStore store2 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyPartition)); + final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(kafkaStreams2, param); return store2.get(key) != null; }, "store2 cannot find results for key"); - final ReadOnlyKeyValueStore store3 = kafkaStreams1 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition)); - - final ReadOnlyKeyValueStore store4 = kafkaStreams2 - .store(StoreQueryParameters.fromNameAndType(TABLE_NAME, queryableStoreType).enableStaleStores().withPartition(keyDontBelongPartition)); + final StoreQueryParameters> otherParam = StoreQueryParameters + .fromNameAndType(TABLE_NAME, queryableStoreType) + .enableStaleStores() + .withPartition(keyDontBelongPartition); + final ReadOnlyKeyValueStore store3 = IntegrationTestUtils.getStore(kafkaStreams1, otherParam); + final ReadOnlyKeyValueStore store4 = IntegrationTestUtils.getStore(kafkaStreams2, otherParam); // Assert that assertThat(store3.get(key), is(nullValue())); @@ -332,7 +322,7 @@ private Properties streamsConfiguration() { final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); - config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + String.valueOf(++port)); + config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + (++port)); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory(applicationId).getPath()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index 007dc26dece07..d296a3dac6d7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -23,7 +23,6 @@ import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Windowed; @@ -337,8 +336,11 @@ private void processKeyValueAndVerifyPlainCount(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyKeyValueStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); + final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); + + if (store == null) + return false; + try (final KeyValueIterator all = store.all()) { final List> storeContent = new LinkedList<>(); while (all.hasNext()) { @@ -362,8 +364,12 @@ private void verifyCountWithTimestamp(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyKeyValueStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + + if (store == null) + return false; + final ValueAndTimestamp count = store.get(key); return count.value() == value && count.timestamp() == timestamp; } catch (final Exception swallow) { @@ -381,8 +387,12 @@ private void verifyCountWithSurrogateTimestamp(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyKeyValueStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + + if (store == null) + return false; + final ValueAndTimestamp count = store.get(key); return count.value() == value && count.timestamp() == -1L; } catch (final Exception swallow) { @@ -411,8 +421,12 @@ private void processKeyValueAndVerifyCount(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyKeyValueStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + + if (store == null) + return false; + try (final KeyValueIterator> all = store.all()) { final List>> storeContent = new LinkedList<>(); while (all.hasNext()) { @@ -446,8 +460,12 @@ private void processKeyValueAndVerifyCountWithTimestamp(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyKeyValueStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedKeyValueStore())); + final ReadOnlyKeyValueStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore()); + + if (store == null) + return false; + try (final KeyValueIterator> all = store.all()) { final List>> storeContent = new LinkedList<>(); while (all.hasNext()) { @@ -812,8 +830,12 @@ private void processWindowedKeyValueAndVerifyPlainCount(final K key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyWindowStore store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.windowStore())); + final ReadOnlyWindowStore store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.windowStore()); + + if (store == null) + return false; + try (final KeyValueIterator, V> all = store.all()) { final List, V>> storeContent = new LinkedList<>(); while (all.hasNext()) { @@ -836,8 +858,12 @@ private void verifyWindowedCountWithSurrogateTimestamp(final Windowed key TestUtils.waitForCondition( () -> { try { - final ReadOnlyWindowStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore())); + final ReadOnlyWindowStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedWindowStore()); + + if (store == null) + return false; + final ValueAndTimestamp count = store.fetch(key.key(), key.window().start()); return count.value() == value && count.timestamp() == -1L; } catch (final Exception swallow) { @@ -856,8 +882,12 @@ private void verifyWindowedCountWithTimestamp(final Windowed key, TestUtils.waitForCondition( () -> { try { - final ReadOnlyWindowStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore())); + final ReadOnlyWindowStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedWindowStore()); + + if (store == null) + return false; + final ValueAndTimestamp count = store.fetch(key.key(), key.window().start()); return count.value() == value && count.timestamp() == timestamp; } catch (final Exception swallow) { @@ -886,8 +916,12 @@ private void processKeyValueAndVerifyWindowedCountWithTimestamp(final K k TestUtils.waitForCondition( () -> { try { - final ReadOnlyWindowStore> store = - kafkaStreams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.timestampedWindowStore())); + final ReadOnlyWindowStore> store = IntegrationTestUtils + .getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.timestampedWindowStore()); + + if (store == null) + return false; + try (final KeyValueIterator, ValueAndTimestamp> all = store.all()) { final List, ValueAndTimestamp>> storeContent = new LinkedList<>(); while (all.hasNext()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 6c88e68005e34..122c0e94533c5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -41,10 +41,13 @@ import org.apache.kafka.streams.KafkaStreams.StateListener; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StoreQueryParameters; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; +import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import scala.Option; @@ -160,8 +163,7 @@ public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final * @param Value type of the data records */ public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Time time) - throws ExecutionException, InterruptedException { + final String topic, final Collection> records, final Properties producerConfig, final Time time) { produceKeyValuesSynchronously(topic, records, producerConfig, time, false); } @@ -175,8 +177,7 @@ public static void produceKeyValuesSynchronously( * @param Value type of the data records */ public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Time time) - throws ExecutionException, InterruptedException { + final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Time time) { produceKeyValuesSynchronously(topic, records, producerConfig, headers, time, false); } @@ -190,8 +191,7 @@ public static void produceKeyValuesSynchronously( * @param Value type of the data records */ public static void produceKeyValuesSynchronously( - final String topic, final Collection> records, final Properties producerConfig, final Time time, final boolean enableTransactions) - throws ExecutionException, InterruptedException { + final String topic, final Collection> records, final Properties producerConfig, final Time time, final boolean enableTransactions) { produceKeyValuesSynchronously(topic, records, producerConfig, null, time, enableTransactions); } @@ -210,16 +210,21 @@ public static void produceKeyValuesSynchronously(final String topic, final Properties producerConfig, final Headers headers, final Time time, - final boolean enableTransactions) - throws ExecutionException, InterruptedException { - for (final KeyValue record : records) { - produceKeyValuesSynchronouslyWithTimestamp(topic, - Collections.singleton(record), - producerConfig, - headers, - time.milliseconds(), - enableTransactions); - time.sleep(1L); + final boolean enableTransactions) { + + try (final Producer producer = new KafkaProducer<>(producerConfig)) { + if (enableTransactions) { + producer.initTransactions(); + producer.beginTransaction(); + } + for (final KeyValue record : records) { + producer.send(new ProducerRecord<>(topic, null, time.milliseconds(), record.key, record.value, headers)); + time.sleep(1L); + } + if (enableTransactions) { + producer.commitTransaction(); + } + producer.flush(); } } @@ -234,8 +239,7 @@ public static void produceKeyValuesSynchronously(final String topic, public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, - final Long timestamp) - throws ExecutionException, InterruptedException { + final Long timestamp) { produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false); } @@ -248,13 +252,11 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin * @param Key type of the data records * @param Value type of the data records */ - @SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Long timestamp, - final boolean enableTransactions) - throws ExecutionException, InterruptedException { + final boolean enableTransactions) { produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, null, timestamp, enableTransactions); } @@ -269,14 +271,12 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin * @param Key type of the data records * @param Value type of the data records */ - @SuppressWarnings("WeakerAccess") public static void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection> records, final Properties producerConfig, final Headers headers, final Long timestamp, - final boolean enableTransactions) - throws ExecutionException, InterruptedException { + final boolean enableTransactions) { try (final Producer producer = new KafkaProducer<>(producerConfig)) { if (enableTransactions) { @@ -284,9 +284,7 @@ public static void produceKeyValuesSynchronouslyWithTimestamp(final Strin producer.beginTransaction(); } for (final KeyValue record : records) { - final Future f = producer.send( - new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); - f.get(); + producer.send(new ProducerRecord<>(topic, null, timestamp, record.key, record.value, headers)); } if (enableTransactions) { producer.commitTransaction(); @@ -367,8 +365,7 @@ public static void produceAbortedKeyValuesSynchronouslyWithTimestamp(fina public static void produceValuesSynchronously(final String topic, final Collection records, final Properties producerConfig, - final Time time) - throws ExecutionException, InterruptedException { + final Time time) { produceValuesSynchronously(topic, records, producerConfig, time, false); } @@ -385,8 +382,7 @@ public static void produceValuesSynchronously(final String topic, final Collection records, final Properties producerConfig, final Time time, - final boolean enableTransactions) - throws ExecutionException, InterruptedException { + final boolean enableTransactions) { final Collection> keyedRecords = new ArrayList<>(); for (final V value : records) { final KeyValue kv = new KeyValue<>(null, value); @@ -599,13 +595,6 @@ public static List> waitUntilFinalKeyValueRecordsReceived( return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, false); } - public static List> waitUntilFinalKeyValueTimestampRecordsReceived(final Properties consumerConfig, - final String topic, - final List> expectedRecords, - final long waitTime) throws InterruptedException { - return waitUntilFinalKeyValueRecordsReceived(consumerConfig, topic, expectedRecords, waitTime, true); - } - @SuppressWarnings("unchecked") private static List waitUntilFinalKeyValueRecordsReceived(final Properties consumerConfig, final String topic, @@ -708,7 +697,7 @@ public static void waitForTopicPartitions(final List servers, } } - public static void waitUntilMetadataIsPropagated(final List servers, + private static void waitUntilMetadataIsPropagated(final List servers, final String topic, final int partition, final long timeout) throws InterruptedException { @@ -732,7 +721,6 @@ public static void waitUntilMetadataIsPropagated(final List servers final UpdateMetadataPartitionState metadataPartitionState = partitionInfo.get(); if (!Request.isValidBrokerId(metadataPartitionState.leader())) { invalidBrokerIds.add(server); - continue; } } @@ -969,52 +957,6 @@ private static String printRecords(final List> resul return resultStr.toString(); } - /** - * Returns up to `maxMessages` message-values from the topic. - * - * @param topic Kafka topic to read messages from - * @param consumerConfig Kafka consumer config - * @param waitTime Maximum wait time in milliseconds - * @param maxMessages Maximum number of messages to read via the consumer. - * @return The values retrieved via the consumer. - */ - public static List readValues(final String topic, final Properties consumerConfig, - final long waitTime, final int maxMessages) { - final List returnList; - try (final Consumer consumer = createConsumer(consumerConfig)) { - returnList = readValues(topic, consumer, waitTime, maxMessages); - } - return returnList; - } - - /** - * Returns up to `maxMessages` by reading via the provided consumer (the topic(s) to read from - * are already configured in the consumer). - * - * @param topic Kafka topic to read messages from - * @param consumerConfig Kafka consumer config - * @param waitTime Maximum wait time in milliseconds - * @param maxMessages Maximum number of messages to read via the consumer - * @return The KeyValue elements retrieved via the consumer - */ - public static List> readKeyValues(final String topic, - final Properties consumerConfig, final long waitTime, final int maxMessages) { - final List> consumedValues; - try (final Consumer consumer = createConsumer(consumerConfig)) { - consumedValues = readKeyValues(topic, consumer, waitTime, maxMessages); - } - return consumedValues; - } - - public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) { - final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); - if (clean) { - driver.cleanUp(); - } - driver.start(); - return driver; - } - /** * Returns up to `maxMessages` message-values from the topic. * @@ -1119,4 +1061,70 @@ private static KafkaConsumer createConsumer(final Properties consum filtered.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); return new KafkaConsumer<>(filtered); } + + public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) { + final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); + if (clean) { + driver.cleanUp(); + } + driver.start(); + return driver; + } + + public static S getStore(final String storeName, + final KafkaStreams streams, + final QueryableStoreType storeType) throws InterruptedException { + return getStore(DEFAULT_TIMEOUT, storeName, streams, storeType); + } + + public static S getStore(final String storeName, + final KafkaStreams streams, + final boolean enableStaleQuery, + final QueryableStoreType storeType) throws InterruptedException { + return getStore(DEFAULT_TIMEOUT, storeName, streams, enableStaleQuery, storeType); + } + + public static S getStore(final long waitTime, + final String storeName, + final KafkaStreams streams, + final QueryableStoreType storeType) throws InterruptedException { + return getStore(waitTime, storeName, streams, false, storeType); + } + + public static S getStore(final long waitTime, + final String storeName, + final KafkaStreams streams, + final boolean enableStaleQuery, + final QueryableStoreType storeType) throws InterruptedException { + final StoreQueryParameters param = enableStaleQuery ? + StoreQueryParameters.fromNameAndType(storeName, storeType).enableStaleStores() : + StoreQueryParameters.fromNameAndType(storeName, storeType); + return getStore(waitTime, streams, param); + } + + public static S getStore(final KafkaStreams streams, + final StoreQueryParameters param) throws InterruptedException { + return getStore(DEFAULT_TIMEOUT, streams, param); + } + + public static S getStore(final long waitTime, + final KafkaStreams streams, + final StoreQueryParameters param) throws InterruptedException { + final long expectedEnd = System.currentTimeMillis() + waitTime; + + while (true) { + try { + return streams.store(param); + } catch (final InvalidStateStoreException e) { + if (System.currentTimeMillis() > expectedEnd) { + throw e; + } + } catch (final Exception e) { + if (System.currentTimeMillis() > expectedEnd) { + throw new AssertionError(e); + } + } + Thread.sleep(Math.min(100L, waitTime)); + } + } } From 57a2c39bcded8ecbdf0f443520ed7385f4ce0dbf Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Tue, 28 Apr 2020 15:02:40 -0700 Subject: [PATCH 2/2] github comments --- .../streams/integration/EosIntegrationTest.java | 2 +- .../integration/GlobalKTableEOSIntegrationTest.java | 13 ++++++++----- .../integration/StoreUpgradeIntegrationTest.java | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index f179d4539d39a..fa65766d962ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -849,7 +849,7 @@ private Set> getMaxPerKey(final List> private void verifyStateStore(final KafkaStreams streams, final Set> expectedStoreContent) throws InterruptedException { final ReadOnlyKeyValueStore store = IntegrationTestUtils - .getStore(300000L, storeName, streams, QueryableStoreTypes.keyValueStore()); + .getStore(300_000L, storeName, streams, QueryableStoreTypes.keyValueStore()); assertNotNull(store); final KeyValueIterator it = store.all(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java index 18ed52718aa5c..2850a5190dc38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableEOSIntegrationTest.java @@ -113,11 +113,13 @@ public void before() throws Exception { .replace(']', '_'); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); - streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig); + streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); + streamsConfiguration.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 300); globalTable = builder.globalTable(globalTableTopic, Consumed.with(Serdes.Long(), Serdes.String()), Materialized.>as(globalStore) .withKeySerde(Serdes.Long()) @@ -128,7 +130,7 @@ public void before() throws Exception { } @After - public void whenShuttingDown() throws Exception { + public void after() throws Exception { if (kafkaStreams != null) { kafkaStreams.close(); } @@ -294,6 +296,7 @@ private void createTopics() throws Exception { .replace(']', '_'); streamTopic = "stream-" + suffix; globalTableTopic = "globalTable-" + suffix; + CLUSTER.deleteAllTopicsAndWait(300_000L); CLUSTER.createTopics(streamTopic); CLUSTER.createTopic(globalTableTopic, 2, 1); } @@ -303,7 +306,7 @@ private void startStreams() { kafkaStreams.start(); } - private void produceTopicValues(final String topic) throws Exception { + private void produceTopicValues(final String topic) { IntegrationTestUtils.produceKeyValuesSynchronously( topic, Arrays.asList( @@ -339,7 +342,7 @@ private void produceAbortedMessages() throws Exception { mockTime.milliseconds()); } - private void produceInitialGlobalTableValues() throws Exception { + private void produceInitialGlobalTableValues() { final Properties properties = new Properties(); properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "someid"); properties.put(ProducerConfig.RETRIES_CONFIG, 1); @@ -360,7 +363,7 @@ private void produceInitialGlobalTableValues() throws Exception { true); } - private void produceGlobalTableValues() throws Exception { + private void produceGlobalTableValues() { IntegrationTestUtils.produceKeyValuesSynchronously( globalTableTopic, Arrays.asList( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java index d296a3dac6d7c..9db1e6b9ba82b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java @@ -338,8 +338,9 @@ private void processKeyValueAndVerifyPlainCount(final K key, try { final ReadOnlyKeyValueStore store = IntegrationTestUtils.getStore(STORE_NAME, kafkaStreams, QueryableStoreTypes.keyValueStore()); - if (store == null) + if (store == null) { return false; + } try (final KeyValueIterator all = store.all()) { final List> storeContent = new LinkedList<>();