From 3b37ab5def2bfa52601e6c1605c6193b38084772 Mon Sep 17 00:00:00 2001 From: Robert Yokota Date: Thu, 10 May 2018 14:01:37 -0700 Subject: [PATCH] Improve err msg when connecting processor with global store --- .../streams/processor/internals/InternalTopologyBuilder.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index b1d60a9ad886c..bfe8cda7da133 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -712,6 +712,10 @@ private void addGlobalStore(final String sourceName, private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) { + if (globalStateStores.containsKey(stateStoreName)) { + throw new TopologyException("Global StateStore " + stateStoreName + + " can be used by a Processor without being specified; it should not be explicitly passed."); + } if (!stateFactories.containsKey(stateStoreName)) { throw new TopologyException("StateStore " + stateStoreName + " is not added yet."); }