Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id,

// create the processor state manager
try {
this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());

} catch (IOException e) {
throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ public class ProcessorStateManager implements StateManager {
* (this might be recoverable by retrying)
* @throws IOException if any severe error happens while creating or locking the state directory
*/
public ProcessorStateManager(final String applicationId,
final TaskId taskId,
public ProcessorStateManager(final TaskId taskId,
final Collection<TopicPartition> sources,
final Consumer<byte[], byte[]> restoreConsumer,
final boolean isStandby,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void cleanup() {
public void testNoTopic() throws IOException {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() {
ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() {
{
put(nonPersistentStoreName, nonPersistentStoreName);
}
Expand Down Expand Up @@ -244,7 +244,7 @@ public void testRegisterPersistentStore() throws IOException {

MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
{
put(persistentStoreName, persistentStoreTopicName);
put(nonPersistentStoreName, nonPersistentStoreName);
Expand Down Expand Up @@ -298,7 +298,7 @@ public void testRegisterNonPersistentStore() throws IOException {

MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
{
put(persistentStoreName, persistentStoreTopicName);
put(nonPersistentStoreName, nonPersistentStoreTopicName);
Expand Down Expand Up @@ -381,7 +381,7 @@ public void testChangeLogOffsets() throws IOException {
// if there is an source partition, inherit the partition id
Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
try {
restoreConsumer.reset();

Expand Down Expand Up @@ -415,7 +415,7 @@ public void testGetStore() throws IOException {

MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
try {
stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);

Expand Down Expand Up @@ -453,7 +453,7 @@ public void testFlushAndClose() throws IOException {
MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);

ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
{
put(persistentStoreName, persistentStoreTopicName);
put(nonPersistentStoreName, nonPersistentStoreTopicName);
Expand Down Expand Up @@ -491,7 +491,7 @@ public void testFlushAndClose() throws IOException {
@Test
public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception {
MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
assertNotNull(stateMgr.getStore(nonPersistentStoreName));
}
Expand All @@ -512,7 +512,7 @@ public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception {


final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());

restoreConsumer.reset();
stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);
Expand Down