From 12a91a5768e3b56e0b43efffcbf5619545900397 Mon Sep 17 00:00:00 2001
From: vitojeng
Date: Mon, 4 Jan 2021 14:42:56 +0800
Subject: [PATCH 1/5] KAFKA-5876: Apply UnknownStateStoreException for
Interactive Queries
---
.../main/java/org/apache/kafka/streams/KafkaStreams.java | 7 ++++---
.../java/org/apache/kafka/streams/KafkaStreamsTest.java | 9 +++++++++
.../streams/integration/JoinStoreIntegrationTest.java | 5 +++--
.../integration/QueryableStateIntegrationTest.java | 5 +++--
4 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c1b4369d503a0..b7eda15dc8e24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -44,6 +44,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -1515,8 +1516,8 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName,
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
- * @throws InvalidStateStoreException If the specified store name does not exist in the topology
- * or if the Streams instance isn't in a queryable state.
+ * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
+ * @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType,
* the Streams instance is not in a queryable state with respect
* to the parameters, or if the store is not available locally, then
@@ -1526,7 +1527,7 @@ public T store(final StoreQueryParameters storeQueryParameters) {
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
- throw new InvalidStateStoreException(
+ throw new UnknownStateStoreException(
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b3dd559d88d63..6746e89f5bc31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -36,6 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -91,6 +92,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
@@ -745,6 +747,13 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing(
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
}
+ @Test(expected = UnknownStateStoreException.class)
+ public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws Exception {
+ final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+ streams.start();
+ streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore()));
+ }
+
@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
// Mock all calls made to compute the offset lags,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c46af43bac1b0..d3b9e48b7b702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -119,9 +120,9 @@ public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws
kafkaStreams.start();
latch.await();
- final InvalidStateStoreException exception =
+ final UnknownStateStoreException exception =
assertThrows(
- InvalidStateStoreException.class,
+ UnknownStateStoreException.class,
() -> kafkaStreams.store(fromNameAndType("join-store", keyValueStore()))
);
assertThat(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f023977e309da..cd1dd0dca61d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -37,6 +37,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -470,8 +471,8 @@ public void shouldRejectNonExistentStoreName() throws InterruptedException {
streams.store(fromNameAndType(storeName, keyValueStore()));
assertThat(store, Matchers.notNullValue());
- final InvalidStateStoreException exception = assertThrows(
- InvalidStateStoreException.class,
+ final UnknownStateStoreException exception = assertThrows(
+ UnknownStateStoreException.class,
() -> streams.store(fromNameAndType("no-table", keyValueStore()))
);
assertThat(
From fc06406a7473e7340eb3f99dbde6c3689e387f39 Mon Sep 17 00:00:00 2001
From: vitojeng
Date: Mon, 4 Jan 2021 15:18:40 +0800
Subject: [PATCH 2/5] fix checkstyle error
---
.../kafka/streams/integration/JoinStoreIntegrationTest.java | 1 -
1 file changed, 1 deletion(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index d3b9e48b7b702..f5ed891afc1f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -21,7 +21,6 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
From 5745d1d958c82c6885f8a9f9fd40ccd19956b44c Mon Sep 17 00:00:00 2001
From: vitojeng
Date: Wed, 13 Jan 2021 15:58:13 +0800
Subject: [PATCH 3/5] address comment - use assertThrows and try-with-resources
---
.../org/apache/kafka/streams/KafkaStreamsTest.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 6746e89f5bc31..82e8d6b2425c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -747,11 +747,12 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing(
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
}
- @Test(expected = UnknownStateStoreException.class)
- public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() throws Exception {
- final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
- streams.start();
- streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore()));
+ @Test
+ public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore())));
+ }
}
@Test
From 49d744d178ecb3411f4f7ebf94fb41cf62eb2823 Mon Sep 17 00:00:00 2001
From: vitojeng
Date: Tue, 20 Apr 2021 11:05:40 +0800
Subject: [PATCH 4/5] address comment - update streams upgrade guide
---
docs/streams/upgrade-guide.html | 3 +++
1 file changed, 3 insertions(+)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index ea410b13e331c..6e59a78fa374a 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -93,6 +93,9 @@ Upgrade Guide and API Changes
+
+ We applyed UnknownStateStoreException to KafkaStreams#store(): If the specified store name does not exist in the topology, UnknownStateStoreException will be thrown instead InvalidStateStoreException(KIP-216).
+
We removed the default implementation of RocksDBConfigSetter#close().
From b36f6371569d77c9975263728f71a99b8429751a Mon Sep 17 00:00:00 2001
From: Vito Jeng
Date: Fri, 23 Apr 2021 08:32:14 +0800
Subject: [PATCH 5/5] address comment - update streams upgrade guide
Co-authored-by: A. Sophie Blee-Goldman
---
docs/streams/upgrade-guide.html | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 6e59a78fa374a..cd0f67773d244 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -94,7 +94,7 @@ Upgrade Guide and API Changes
- We applyed UnknownStateStoreException to KafkaStreams#store(): If the specified store name does not exist in the topology, UnknownStateStoreException will be thrown instead InvalidStateStoreException(KIP-216).
+ A new exception may be thrown from KafkaStreams#store(). If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See KIP-216 for more information.
We removed the default implementation of RocksDBConfigSetter#close().