diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index ea410b13e331c..cd0f67773d244 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -93,6 +93,9 @@
Upgrade Guide and API Changes
+
+ A new exception may be thrown from KafkaStreams#store(). If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See KIP-216 for more information.
+
We removed the default implementation of RocksDBConfigSetter#close().
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index c1b4369d503a0..b7eda15dc8e24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -44,6 +44,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -1515,8 +1516,8 @@ public KeyQueryMetadata queryMetadataForKey(final String storeName,
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
- * @throws InvalidStateStoreException If the specified store name does not exist in the topology
- * or if the Streams instance isn't in a queryable state.
+ * @throws UnknownStateStoreException If the specified store name does not exist in the topology.
+ * @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType,
* the Streams instance is not in a queryable state with respect
* to the parameters, or if the store is not available locally, then
@@ -1526,7 +1527,7 @@ public T store(final StoreQueryParameters storeQueryParameters) {
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
- throw new InvalidStateStoreException(
+ throw new UnknownStateStoreException(
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b3dd559d88d63..82e8d6b2425c7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -36,6 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -91,6 +92,7 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
@@ -745,6 +747,14 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing(
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
}
+ @Test
+ public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() {
+ try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ streams.start();
+ assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore())));
+ }
+ }
+
@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
// Mock all calls made to compute the offset lags,
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
index c46af43bac1b0..f5ed891afc1f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
@@ -21,7 +21,7 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
@@ -119,9 +119,9 @@ public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws
kafkaStreams.start();
latch.await();
- final InvalidStateStoreException exception =
+ final UnknownStateStoreException exception =
assertThrows(
- InvalidStateStoreException.class,
+ UnknownStateStoreException.class,
() -> kafkaStreams.store(fromNameAndType("join-store", keyValueStore()))
);
assertThat(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index f023977e309da..cd1dd0dca61d1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -37,6 +37,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
@@ -470,8 +471,8 @@ public void shouldRejectNonExistentStoreName() throws InterruptedException {
streams.store(fromNameAndType(storeName, keyValueStore()));
assertThat(store, Matchers.notNullValue());
- final InvalidStateStoreException exception = assertThrows(
- InvalidStateStoreException.class,
+ final UnknownStateStoreException exception = assertThrows(
+ UnknownStateStoreException.class,
() -> streams.store(fromNameAndType("no-table", keyValueStore()))
);
assertThat(