From 556c2140a954bc3abf387ef1904c6129c45cf098 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Mon, 16 Jan 2017 11:55:12 +0000 Subject: [PATCH] remove unused param --- .../processor/internals/AbstractTask.java | 2 +- .../internals/ProcessorStateManager.java | 3 +-- .../internals/ProcessorStateManagerTest.java | 16 ++++++++-------- 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 0730c68ca0c9f..55418d5b4e030 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); + this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index a21c3e8dc4f92..ad16c77c2a8ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -74,8 +74,7 @@ public class ProcessorStateManager implements StateManager { * (this might be recoverable by retrying) * @throws IOException if any severe error happens while creating or locking the state directory */ - public ProcessorStateManager(final String applicationId, - final TaskId taskId, + public ProcessorStateManager(final TaskId taskId, final Collection sources, final Consumer restoreConsumer, final boolean isStandby, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index de547230ae3f9..602601a17e731 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -212,7 +212,7 @@ public void cleanup() { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap() { + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap() { { put(nonPersistentStoreName, nonPersistentStoreName); } @@ -244,7 +244,7 @@ public void testRegisterPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreName); @@ -298,7 +298,7 @@ public void testRegisterNonPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreTopicName); @@ -381,7 +381,7 @@ public void testChangeLogOffsets() throws IOException { // if there is an source partition, inherit the partition id Set sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby try { restoreConsumer.reset(); @@ -415,7 +415,7 @@ public void testGetStore() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -453,7 +453,7 @@ public void testFlushAndClose() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreTopicName); @@ -491,7 +491,7 @@ public void testFlushAndClose() throws IOException { @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -512,7 +512,7 @@ public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);