Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public ProcessorStateManager(final TaskId taskId,
final Map<String, String> storeToChangelogTopic) throws LockException, IOException {
this.taskId = taskId;
this.stateDirectory = stateDirectory;
this.baseDir = stateDirectory.directoryForTask(taskId);
this.logPrefix = String.format("task [%s]", taskId);
this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
this.partitionForTopic.put(source.topic(), source);
Expand All @@ -96,10 +96,18 @@ public ProcessorStateManager(final TaskId taskId,
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.storeToChangelogTopic = storeToChangelogTopic;

this.logPrefix = String.format("task [%s]", taskId);

if (!stateDirectory.lock(taskId, 5)) {
throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath()));
throw new LockException(String.format("%s Failed to lock the state directory for task %s",
logPrefix, taskId));
}
// get a handle on the parent/base directory of the task directory
// note that the parent directory could have been accidentally deleted here,
// so catch that exception if that is the case
try {
this.baseDir = stateDirectory.directoryForTask(taskId);
} catch (ProcessorStateException e) {
throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s",
logPrefix, taskId, e));
}

// load the checkpoint information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,18 @@ public File globalStateDir() {
* @throws IOException
*/
public boolean lock(final TaskId taskId, int retry) throws IOException {
final File lockFile;
// we already have the lock so bail out here
if (locks.containsKey(taskId)) {
return true;
}
final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it OK that we call directoryForTask with no protection in ProcessorStateManager before we call lock?

        this.baseDir  = stateDirectory.directoryForTask(taskId);
        this.partitionForTopic = new HashMap<>();
        for (TopicPartition source : sources) {
            this.partitionForTopic.put(source.topic(), source);
        }
        this.stores = new LinkedHashMap<>();
        this.globalStores = new HashMap<>();
        this.offsetLimits = new HashMap<>();
        this.restoredOffsets = new HashMap<>();
        this.isStandby = isStandby;
        this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
        this.storeToChangelogTopic = storeToChangelogTopic;

        this.logPrefix = String.format("task [%s]", taskId);

        if (!stateDirectory.lock(taskId, 5)) {
            throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath()));
        }

Copy link
Copy Markdown
Contributor Author

@enothereska enothereska Apr 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch @ijuma . In this case crashing is probably the right thing to do, however I've now caught the exception and propagated further so at least we know why we're crashing.

} catch (ProcessorStateException e) {
// directoryForTask could be throwing an exception if another thread
// has concurrently deleted the directory
return false;
}

final FileChannel channel;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
Expand All @@ -32,6 +33,7 @@
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -97,6 +99,22 @@ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception {
assertTrue(directory.lock(taskId, 0));
}

@Test(expected = ProcessorStateException.class)
public void shouldThrowProcessorStateException() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Utils.delete(stateDir);
directory.directoryForTask(taskId);
}

@Test
public void shouldNotLockDeletedDirectory() throws Exception {
final TaskId taskId = new TaskId(0, 0);

Utils.delete(stateDir);
assertFalse(directory.lock(taskId, 0));
}


@Test
public void shouldLockMulitpleTaskDirectories() throws Exception {
Expand Down