From 2fb966ff9d3f030c1b6c54e52bcee8032fada5fb Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Tue, 14 Jul 2020 17:46:23 +0300 Subject: [PATCH 01/15] Performance issue, in case of withPartition parameter exists - do not return all state stores --- .../streams/state/internals/WrappingStoreProvider.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 5c9ae1a4370af..3deb2d37ababf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -48,7 +48,12 @@ public List stores(final String storeName, final List allStores = new ArrayList<>(); for (final StreamThreadStateStoreProvider provider : storeProviders) { final List stores = provider.stores(storeQueryParameters); - allStores.addAll(stores); + if (!stores.isEmpty()) { + allStores.addAll(stores); + if (storeQueryParameters.partition() != null) { + break; + } + } } if (allStores.isEmpty()) { throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); From 839379de35fe41c044f84aeffcf414a3c3bb91ff Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 14:35:55 +0300 Subject: [PATCH 02/15] Find task in-place, avoid synchorinized calling to topicsGroup --- .../apache/kafka/streams/KafkaStreams.java | 2 +- .../StreamThreadStateStoreProvider.java | 38 +++++++------------ .../StreamThreadStateStoreProviderTest.java | 2 +- .../kafka/test/StateStoreProviderStub.java | 2 +- 4 files changed, 16 insertions(+), 28 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index b3900c260c746..b26c6e3a7ced6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -784,7 +784,7 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, delegatingStateRestoreListener, i + 1); threadState.put(threads[i].getId(), threads[i].state()); - storeProviders.add(new StreamThreadStateStoreProvider(threads[i], internalTopologyBuilder)); + storeProviders.add(new StreamThreadStateStoreProvider(threads[i])); } ClientMetrics.addNumAliveStreamThreadMetric(streamsMetrics, (metricsConfig, now) -> diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 7cc263a861ebb..78d55a888b40b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -20,7 +20,6 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; @@ -30,27 +29,21 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; public class StreamThreadStateStoreProvider { private final StreamThread streamThread; - private final InternalTopologyBuilder internalTopologyBuilder; - public StreamThreadStateStoreProvider(final StreamThread streamThread, - final InternalTopologyBuilder internalTopologyBuilder) { + public StreamThreadStateStoreProvider(final StreamThread streamThread) { this.streamThread = streamThread; - this.internalTopologyBuilder = internalTopologyBuilder; } @SuppressWarnings("unchecked") public List stores(final StoreQueryParameters storeQueryParams) { final String storeName = storeQueryParams.storeName(); final QueryableStoreType queryableStoreType = storeQueryParams.queryableStoreType(); - final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition()); if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } @@ -58,12 +51,12 @@ public List stores(final StoreQueryParameters storeQueryParams) { if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); final List stores = new ArrayList<>(); - if (keyTaskId != null) { - final Task task = tasks.get(keyTaskId); + if (storeQueryParams.partition() != null) { + final Task task = findTask(tasks, storeName, storeQueryParams.partition()); if (task == null) { return Collections.emptyList(); } - final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); + final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, task.id()); if (store != null) { return Collections.singletonList(store); } @@ -104,19 +97,14 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private TaskId createKeyTaskId(final String storeName, final Integer partition) { - if (partition == null) { - return null; - } - final List sourceTopics = internalTopologyBuilder.stateStoreNameToSourceTopics().get(storeName); - final Set sourceTopicsSet = new HashSet<>(sourceTopics); - final Map topicGroups = internalTopologyBuilder.topicGroups(); - for (final Map.Entry topicGroup : topicGroups.entrySet()) { - if (topicGroup.getValue().sourceTopics.containsAll(sourceTopicsSet)) { - return new TaskId(topicGroup.getKey(), partition); - } - } - throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the requested partition " + - partition + " is not available on this instance"); + private Task findTask(final Map tasks, final String storeName, final int partition) { + return tasks.entrySet().stream(). + filter(entry -> + entry.getKey().partition == partition && entry.getValue().getStore(storeName) != null). + filter(entry -> + storeName.equals(entry.getValue().getStore(storeName).name())). + findFirst(). + map(Map.Entry::getValue). + orElse(null); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 9dac5341d150d..24945755edbe6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -161,7 +161,7 @@ public void before() { tasks.put(new TaskId(0, 1), taskTwo); threadMock = EasyMock.createNiceMock(StreamThread.class); - provider = new StreamThreadStateStoreProvider(threadMock, internalTopologyBuilder); + provider = new StreamThreadStateStoreProvider(threadMock); } diff --git a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java index bc0e33a3a5699..9d89ae2082ccb 100644 --- a/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java +++ b/streams/src/test/java/org/apache/kafka/test/StateStoreProviderStub.java @@ -39,7 +39,7 @@ public class StateStoreProviderStub extends StreamThreadStateStoreProvider { private final int defaultStorePartition = 0; public StateStoreProviderStub(final boolean throwException) { - super(null, null); + super(null); this.throwException = throwException; } From a15d28f518bdd008b393d9f280341da6fb564b56 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 14:45:12 +0300 Subject: [PATCH 03/15] Find task in-place, avoid synchorinized calling to topicsGroup --- .../StreamThreadStateStoreProvider.java | 13 +++--- .../internals/WrappingStoreProviderTest.java | 44 ++++++++++++------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 78d55a888b40b..25e7a39697463 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -52,11 +52,11 @@ public List stores(final StoreQueryParameters storeQueryParams) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { - final Task task = findTask(tasks, storeName, storeQueryParams.partition()); - if (task == null) { + final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); + if (streamTask == null) { return Collections.emptyList(); } - final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, task.id()); + final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); if (store != null) { return Collections.singletonList(store); } @@ -97,11 +97,10 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private Task findTask(final Map tasks, final String storeName, final int partition) { + private Task findStreamTask(final Map tasks, final String storeName, final int partition) { return tasks.entrySet().stream(). - filter(entry -> - entry.getKey().partition == partition && entry.getValue().getStore(storeName) != null). - filter(entry -> + filter(entry -> entry.getKey().partition == partition && + entry.getValue().getStore(storeName) != null && storeName.equals(entry.getValue().getStore(storeName).name())). findFirst(). map(Map.Entry::getValue). diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java index ceb3f798d9e1a..189704819c8f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/WrappingStoreProviderTest.java @@ -39,26 +39,24 @@ public class WrappingStoreProviderTest { private WrappingStoreProvider wrappingStoreProvider; + private final int numStateStorePartitions = 2; + @Before public void before() { final StateStoreProviderStub stubProviderOne = new StateStoreProviderStub(false); final StateStoreProviderStub stubProviderTwo = new StateStoreProviderStub(false); - - stubProviderOne.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderOne.addStore("window", new NoOpWindowStore()); - stubProviderTwo.addStore("kv", Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), - Serdes.serdeFrom(String.class), - Serdes.serdeFrom(String.class)) - .build()); - stubProviderTwo.addStore("window", new NoOpWindowStore()); - wrappingStoreProvider = new WrappingStoreProvider( - Arrays.asList(stubProviderOne, stubProviderTwo), - StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) - ); + for (int partition = 0; partition < numStateStorePartitions; partition++) { + stubProviderOne.addStore("kv", partition, Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv"), + Serdes.serdeFrom(String.class), + Serdes.serdeFrom(String.class)) + .build()); + stubProviderOne.addStore("window", partition, new NoOpWindowStore()); + wrappingStoreProvider = new WrappingStoreProvider( + Arrays.asList(stubProviderOne, stubProviderTwo), + StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()) + ); + } } @Test @@ -82,4 +80,20 @@ public void shouldThrowInvalidStoreExceptionIfNoStoreOfTypeFound() { wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("doesn't exist", QueryableStoreTypes.keyValueStore())); wrappingStoreProvider.stores("doesn't exist", QueryableStoreTypes.keyValueStore()); } + + @Test + public void shouldReturnAllStoreWhenQueryWithoutPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore())); + final List> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); + assertEquals(numStateStorePartitions, results.size()); + } + + @Test + public void shouldReturnSingleStoreWhenQueryWithPartition() { + wrappingStoreProvider.setStoreQueryParameters(StoreQueryParameters.fromNameAndType("kv", QueryableStoreTypes.keyValueStore()).withPartition(numStateStorePartitions - 1)); + final List> results = + wrappingStoreProvider.stores("kv", QueryableStoreTypes.keyValueStore()); + assertEquals(1, results.size()); + } } \ No newline at end of file From c1802752ed407e8a4e2a0ff00ea792b1466b3906 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 14:50:17 +0300 Subject: [PATCH 04/15] Find task in-place, avoid synchorinized calling to topicsGroup --- .../internals/StreamThreadStateStoreProvider.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 25e7a39697463..651340d188fad 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -50,22 +50,19 @@ public List stores(final StoreQueryParameters storeQueryParams) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); - final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); if (streamTask == null) { return Collections.emptyList(); } + final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); + return store != null ? Collections.singletonList(store) : Collections.emptyList(); + } + final List stores = new ArrayList<>(); + for (final Task streamTask : tasks.values()) { final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); if (store != null) { - return Collections.singletonList(store); - } - } else { - for (final Task streamTask : tasks.values()) { - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - if (store != null) { - stores.add(store); - } + stores.add(store); } } return stores; From 4209ce29861c1fea6cfe6628ed54da7e772e7f8f Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 18:54:45 +0300 Subject: [PATCH 05/15] Find task in-place, avoid synchorinized calling to topicsGroup --- .../internals/StreamThreadStateStoreProvider.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 651340d188fad..ddd8d3c8a56e6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -27,10 +27,11 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { @@ -58,14 +59,10 @@ public List stores(final StoreQueryParameters storeQueryParams) { final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); return store != null ? Collections.singletonList(store) : Collections.emptyList(); } - final List stores = new ArrayList<>(); - for (final Task streamTask : tasks.values()) { - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - if (store != null) { - stores.add(store); - } - } - return stores; + return tasks.values().stream(). + map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + filter(Objects::nonNull). + collect(Collectors.toList()); } else { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + From f5f2e46fbd9ec3f39c6c1f140d220a6052fdccd4 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 20:01:00 +0300 Subject: [PATCH 06/15] remove extra loops over all stores of all providers as a sanity check before returning the WrappingStoreProvider --- .../internals/QueryableStoreProvider.java | 20 ------------------- .../internals/WrappingStoreProvider.java | 10 ++++++++-- 2 files changed, 8 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java index 2af5874af341e..8dd1f032cd19f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/QueryableStoreProvider.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.StoreQueryParameters; -import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.state.QueryableStoreType; @@ -56,25 +55,6 @@ public T getStore(final StoreQueryParameters storeQueryParameters) { if (!globalStore.isEmpty()) { return queryableStoreType.create(globalStoreProvider, storeName); } - final List allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { - final List stores = storeProvider.stores(storeQueryParameters); - if (!stores.isEmpty()) { - allStores.addAll(stores); - if (storeQueryParameters.partition() != null) { - break; - } - } - } - if (allStores.isEmpty()) { - if (storeQueryParameters.partition() != null) { - throw new InvalidStateStoreException( - String.format("The specified partition %d for store %s does not exist.", - storeQueryParameters.partition(), - storeName)); - } - throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); - } return queryableStoreType.create( new WrappingStoreProvider(storeProviders, storeQueryParameters), storeName diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java index 3deb2d37ababf..26c5db0e192fd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java @@ -46,8 +46,8 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet public List stores(final String storeName, final QueryableStoreType queryableStoreType) { final List allStores = new ArrayList<>(); - for (final StreamThreadStateStoreProvider provider : storeProviders) { - final List stores = provider.stores(storeQueryParameters); + for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { + final List stores = storeProvider.stores(storeQueryParameters); if (!stores.isEmpty()) { allStores.addAll(stores); if (storeQueryParameters.partition() != null) { @@ -56,6 +56,12 @@ public List stores(final String storeName, } } if (allStores.isEmpty()) { + if (storeQueryParameters.partition() != null) { + throw new InvalidStateStoreException( + String.format("The specified partition %d for store %s does not exist.", + storeQueryParameters.partition(), + storeName)); + } throw new InvalidStateStoreException("The state store, " + storeName + ", may have migrated to another instance."); } return allStores; From b7d38c6ed55dae174de77de429cb94fa1c26344c Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 15 Jul 2020 20:30:28 +0300 Subject: [PATCH 07/15] remove extra loops over all stores of all providers as a sanity check before returning the WrappingStoreProvider --- .../state/internals/QueryableStoreProviderTest.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java index 2d047556f9c82..f2ca0c0298afb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/QueryableStoreProviderTest.java @@ -60,12 +60,12 @@ public void before() { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfKVStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionIfWindowStoreDoesntExist() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -80,12 +80,12 @@ public void shouldReturnWindowStoreWhenItExists() { @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForWindowStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(windowStore, QueryableStoreTypes.keyValueStore())).get("1"); } @Test(expected = InvalidStateStoreException.class) public void shouldThrowExceptionWhenLookingForKVStoreWithDifferentType() { - storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())); + storeProvider.getStore(StoreQueryParameters.fromNameAndType(keyValueStore, QueryableStoreTypes.windowStore())).fetch("1", System.currentTimeMillis()); } @Test @@ -106,7 +106,7 @@ public void shouldThrowExceptionWhenKVStoreWithPartitionDoesntExists() { storeProvider.getStore( StoreQueryParameters .fromNameAndType(keyValueStore, QueryableStoreTypes.keyValueStore()) - .withPartition(partition)) + .withPartition(partition)).get("1") ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, keyValueStore))); } @@ -123,7 +123,7 @@ public void shouldThrowExceptionWhenWindowStoreWithPartitionDoesntExists() { storeProvider.getStore( StoreQueryParameters .fromNameAndType(windowStore, QueryableStoreTypes.windowStore()) - .withPartition(partition)) + .withPartition(partition)).fetch("1", System.currentTimeMillis()) ); assertThat(thrown.getMessage(), equalTo(String.format("The specified partition %d for store %s does not exist.", partition, windowStore))); } From 89ac008bbe0763619a68d5cb455f73ec1aee54be Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Thu, 16 Jul 2020 19:58:24 +0300 Subject: [PATCH 08/15] refactor if/else flow to eliminate early-return --- .../StreamThreadStateStoreProvider.java | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index ddd8d3c8a56e6..f12ae9657d28b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -27,11 +27,11 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { @@ -51,18 +51,22 @@ public List stores(final StoreQueryParameters storeQueryParams) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); + final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); - if (streamTask == null) { - return Collections.emptyList(); + if (streamTask != null) { + final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); + if (store != null) { + stores.add(store); + } } - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - return store != null ? Collections.singletonList(store) : Collections.emptyList(); + } else { + tasks.values().stream(). + map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + filter(Objects::nonNull). + forEach(stores::add); } - return tasks.values().stream(). - map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). - filter(Objects::nonNull). - collect(Collectors.toList()); + return Collections.unmodifiableList(stores); } else { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + From 158db402af5cfbbffeba297d3d9d3811ea75a979 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Thu, 16 Jul 2020 21:03:51 +0300 Subject: [PATCH 09/15] concise if/else flow into functional --- .../StreamThreadStateStoreProvider.java | 29 +++++++++---------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index f12ae9657d28b..01f0fed2bb8e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -27,11 +27,12 @@ import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { @@ -51,22 +52,19 @@ public List stores(final StoreQueryParameters storeQueryParams) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); - final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { - final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); - if (streamTask != null) { - final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); - if (store != null) { - stores.add(store); - } - } + return findStreamTask(tasks, storeName, storeQueryParams.partition()). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + map(Collections::singletonList). + orElse(Collections.emptyList()); } else { - tasks.values().stream(). - map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). + return tasks.values().stream(). + map(streamTask -> + validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). filter(Objects::nonNull). - forEach(stores::add); + collect(Collectors.toList()); } - return Collections.unmodifiableList(stores); } else { throw new InvalidStateStoreException("Cannot get state store " + storeName + " because the stream thread is " + state + ", not RUNNING" + @@ -95,13 +93,12 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private Task findStreamTask(final Map tasks, final String storeName, final int partition) { + private Optional findStreamTask(final Map tasks, final String storeName, final int partition) { return tasks.entrySet().stream(). filter(entry -> entry.getKey().partition == partition && entry.getValue().getStore(storeName) != null && storeName.equals(entry.getValue().getStore(storeName).name())). findFirst(). - map(Map.Entry::getValue). - orElse(null); + map(Map.Entry::getValue); } } From b22417fee013cb18e19ad96d4f2d7c0845cf3452 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Fri, 17 Jul 2020 11:53:50 +0300 Subject: [PATCH 10/15] optimize performance - avoid creating intermediate active tasks map --- .../internals/StreamThreadStateStoreProvider.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 01f0fed2bb8e4..3a4a161216563 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -33,6 +33,7 @@ import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.Stream; public class StreamThreadStateStoreProvider { @@ -51,15 +52,19 @@ public List stores(final StoreQueryParameters storeQueryParams) { } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { - final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); + final Stream> taskStream = storeQueryParams.staleStoresEnabled() ? + streamThread.allTasks().entrySet().stream() : + streamThread.allTasks().entrySet().stream().filter(entry -> entry.getValue().isActive()); + if (storeQueryParams.partition() != null) { - return findStreamTask(tasks, storeName, storeQueryParams.partition()). + return findStreamTask(taskStream, storeName, storeQueryParams.partition()). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). map(Collections::singletonList). orElse(Collections.emptyList()); } else { - return tasks.values().stream(). + return taskStream. + map(Map.Entry::getValue). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). filter(Objects::nonNull). @@ -93,8 +98,8 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private Optional findStreamTask(final Map tasks, final String storeName, final int partition) { - return tasks.entrySet().stream(). + private Optional findStreamTask(final Stream> taskStream, final String storeName, final int partition) { + return taskStream. filter(entry -> entry.getKey().partition == partition && entry.getValue().getStore(storeName) != null && storeName.equals(entry.getValue().getStore(storeName).name())). From da8554cff2f30fcb00f748cd81e7d676098850c9 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Fri, 17 Jul 2020 12:06:55 +0300 Subject: [PATCH 11/15] optimize performance - avoid creating intermediate active tasks map --- .../StreamThreadStateStoreProvider.java | 26 ++++++++----------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java index 3a4a161216563..d5a175d9e518d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java @@ -29,11 +29,10 @@ import java.util.Collections; import java.util.List; -import java.util.Map; +import java.util.Collection; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; -import java.util.stream.Stream; public class StreamThreadStateStoreProvider { @@ -52,19 +51,17 @@ public List stores(final StoreQueryParameters storeQueryParams) { } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { - final Stream> taskStream = storeQueryParams.staleStoresEnabled() ? - streamThread.allTasks().entrySet().stream() : - streamThread.allTasks().entrySet().stream().filter(entry -> entry.getValue().isActive()); + final Collection tasks = storeQueryParams.staleStoresEnabled() ? + streamThread.allTasks().values() : streamThread.activeTasks(); if (storeQueryParams.partition() != null) { - return findStreamTask(taskStream, storeName, storeQueryParams.partition()). + return findStreamTask(tasks, storeName, storeQueryParams.partition()). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). map(Collections::singletonList). orElse(Collections.emptyList()); } else { - return taskStream. - map(Map.Entry::getValue). + return tasks.stream(). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). filter(Objects::nonNull). @@ -98,12 +95,11 @@ private T validateAndListStores(final StateStore store, final QueryableStore } } - private Optional findStreamTask(final Stream> taskStream, final String storeName, final int partition) { - return taskStream. - filter(entry -> entry.getKey().partition == partition && - entry.getValue().getStore(storeName) != null && - storeName.equals(entry.getValue().getStore(storeName).name())). - findFirst(). - map(Map.Entry::getValue); + private Optional findStreamTask(final Collection tasks, final String storeName, final int partition) { + return tasks.stream(). + filter(streamTask -> streamTask.id().partition == partition && + streamTask.getStore(storeName) != null && + storeName.equals(streamTask.getStore(storeName).name())). + findFirst(); } } From fca67e47561ac63fb15c1fd13f3c19c3bead496e Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Fri, 17 Jul 2020 13:45:51 +0300 Subject: [PATCH 12/15] [TESTS] remove expensive double-iteration from state provider --- .../streams/integration/StreamStreamJoinIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index c503a98c3b1f5..bf0a9d92ab38a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -94,7 +94,7 @@ public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedExceptio kafkaStreams.start(); latch.await(); - assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore()))); + assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get("1")); } } From 9838391304ff1cb3d1eab7b9d4f0126c57a66a1b Mon Sep 17 00:00:00 2001 From: albert02lowis Date: Sat, 15 Aug 2020 19:27:36 +0300 Subject: [PATCH 13/15] =?UTF-8?q?KAFKA-9273:=20Extract=20testShouldAutoShu?= =?UTF-8?q?tdownOnIncompleteMetadata=20from=20S=E2=80=A6=20(#9108)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main goal is to remove usage of embedded broker (EmbeddedKafkaCluster) in AbstractJoinIntegrationTest and its subclasses. This is because the tests under this class are no longer using the embedded broker, except for two. testShouldAutoShutdownOnIncompleteMetadata is one of such tests. Furthermore, this test does not actually perfom stream-table join; it is testing an edge case of joining with a non-existent topic, so it should be in a separate test. Testing strategy: run existing unit and integration test Reviewers: Boyang Chen , Bill Bejeck --- .../StreamStreamJoinIntegrationTest.java | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java index 8732c248c1ba0..67db9bde266a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java @@ -60,34 +60,6 @@ public void prepareTopology() throws InterruptedException { rightStream = builder.stream(INPUT_TOPIC_RIGHT); } - @Test - public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedException { - STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-no-store-access"); - final StreamsBuilder builder = new StreamsBuilder(); - - final KStream left = builder.stream(INPUT_TOPIC_LEFT, Consumed.with(Serdes.String(), Serdes.Integer())); - final KStream right = builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), Serdes.Integer())); - final CountDownLatch latch = new CountDownLatch(1); - - left.join( - right, - (value1, value2) -> value1 + value2, - JoinWindows.of(ofMillis(100)), - StreamJoined.with(Serdes.String(), Serdes.Integer(), Serdes.Integer()).withStoreName("join-store")); - - try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), STREAMS_CONFIG)) { - kafkaStreams.setStateListener((newState, oldState) -> { - if (newState == KafkaStreams.State.RUNNING) { - latch.countDown(); - } - }); - - kafkaStreams.start(); - latch.await(); - assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get("1")); - } - } - @Test public void testInner() { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); From 73def50d582d4c90bc5a53fed7463aa0a3b0d0d0 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Mon, 5 Oct 2020 18:02:59 +0300 Subject: [PATCH 14/15] fix test to throw exception on store request --- .../kafka/streams/integration/JoinStoreIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java index c519117fabe20..e15788a89bf6a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java @@ -108,7 +108,7 @@ public void shouldNotAccessJoinStoresWhenGivingName() throws InterruptedExceptio kafkaStreams.start(); latch.await(); - assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore()))); + assertThrows(InvalidStateStoreException.class, () -> kafkaStreams.store(StoreQueryParameters.fromNameAndType("join-store", QueryableStoreTypes.keyValueStore())).get(1)); } } } From 702cc065a2a1e723e3c84268d02983025c144f51 Mon Sep 17 00:00:00 2001 From: "dima.reznik" Date: Wed, 7 Oct 2020 15:10:16 +0300 Subject: [PATCH 15/15] waiting state store readiness by probing non-existing value --- .../kafka/streams/integration/EosBetaUpgradeIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java index cd57acb65b72f..d039d98e6995e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosBetaUpgradeIntegrationTest.java @@ -1080,6 +1080,7 @@ private Set keysFromInstance(final KafkaStreams streams) throws Exception streams, StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore()) ); + waitForCondition(() -> store.get(-1L) == null, MAX_WAIT_TIME_MS, () -> "State store did not ready: " + storeName); final Set keys = new HashSet<>(); try (final KeyValueIterator it = store.all()) { while (it.hasNext()) {