From e1e99fe2717dd9b9e2d5d7402b5f69e2c47337a9 Mon Sep 17 00:00:00 2001 From: Bruno Cadonna Date: Tue, 12 Jan 2021 13:56:59 +0100 Subject: [PATCH] MINOR: Fix flaky test shouldQuerySpecificActivePartitionStores --- .../streams/integration/StoreQueryIntegrationTest.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index 9142314689758..38760694a9a5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -208,20 +208,16 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { assertThat(store1, is(nullValue())); } - // Assert that only active for a specific requested partition serves key if stale stores and not enabled - assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); - final StoreQueryParameters> storeQueryParam2 = StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) .withPartition(keyDontBelongPartition); - - try { // Assert that key is not served when wrong specific partition is requested // If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition // So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested if (kafkaStreams1IsActive) { + assertThat(store1.get(key), is(notNullValue())); assertThat(getStore(kafkaStreams2, storeQueryParam2).get(key), is(nullValue())); final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams1, storeQueryParam2).get(key)); @@ -230,6 +226,7 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { containsString("The specified partition 1 for store source-table does not exist.") ); } else { + assertThat(store2.get(key), is(notNullValue())); assertThat(getStore(kafkaStreams1, storeQueryParam2).get(key), is(nullValue())); final InvalidStateStoreException exception = assertThrows(InvalidStateStoreException.class, () -> getStore(kafkaStreams2, storeQueryParam2).get(key));