Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ <h1>Upgrade Guide and API Changes</h1>
</p>

<h3><a id="streams_api_changes_300" href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
A new exception may be thrown from <code>KafkaStreams#store()</code>. If the specified store name does not exist in the topology, an UnknownStateStoreException will be thrown instead of the former InvalidStateStoreException. See <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-216%3A+IQ+should+throw+different+exceptions+for+different+errors">KIP-216</a> for more information.
</p>
<p>
We removed the default implementation of <code>RocksDBConfigSetter#close()</code>.
</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand Down Expand Up @@ -1515,8 +1516,8 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
*
* @param storeQueryParameters the parameters used to fetch a queryable store
* @return A facade wrapping the local {@link StateStore} instances
* @throws InvalidStateStoreException If the specified store name does not exist in the topology
* or if the Streams instance isn't in a queryable state.
* @throws UnknownStateStoreException If the specified store name does not exist in the topology.
* @throws InvalidStateStoreException If the Streams instance isn't in a queryable state.
* If the store's type does not match the QueryableStoreType,
* the Streams instance is not in a queryable state with respect
* to the parameters, or if the store is not available locally, then
Expand All @@ -1526,7 +1527,7 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
final String storeName = storeQueryParameters.storeName();
if ((taskTopology == null || !taskTopology.hasStore(storeName))
&& (globalTaskTopology == null || !globalTaskTopology.hasStore(storeName))) {
throw new InvalidStateStoreException(
throw new UnknownStateStoreException(
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateRestoreListener;
Expand Down Expand Up @@ -91,6 +92,7 @@

import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyLong;
Expand Down Expand Up @@ -745,6 +747,14 @@ public void shouldNotGetQueryMetadataWithPartitionerWhenNotRunningOrRebalancing(
assertThrows(IllegalStateException.class, () -> streams.queryMetadataForKey("store", "key", (topic, key, value, numPartitions) -> 0));
}

@Test
public void shouldThrowUnknownStateStoreExceptionWhenStoreNotExist() {
try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
streams.start();
assertThrows(UnknownStateStoreException.class, () -> streams.store(StoreQueryParameters.fromNameAndType("unknown-store", keyValueStore())));
}
}

@Test
public void shouldReturnEmptyLocalStorePartitionLags() {
// Mock all calls made to compute the offset lags,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
Expand Down Expand Up @@ -119,9 +119,9 @@ public void providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws

kafkaStreams.start();
latch.await();
final InvalidStateStoreException exception =
final UnknownStateStoreException exception =
assertThrows(
InvalidStateStoreException.class,
UnknownStateStoreException.class,
() -> kafkaStreams.store(fromNameAndType("join-store", keyValueStore()))
);
assertThat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
Expand Down Expand Up @@ -470,8 +471,8 @@ public void shouldRejectNonExistentStoreName() throws InterruptedException {
streams.store(fromNameAndType(storeName, keyValueStore()));
assertThat(store, Matchers.notNullValue());

final InvalidStateStoreException exception = assertThrows(
InvalidStateStoreException.class,
final UnknownStateStoreException exception = assertThrows(
UnknownStateStoreException.class,
() -> streams.store(fromNameAndType("no-table", keyValueStore()))
);
assertThat(
Expand Down