diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 65831a24d7183..2ef9634218859 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -82,7 +82,7 @@ public ProcessorStateManager(final TaskId taskId, final Map storeToChangelogTopic) throws LockException, IOException { this.taskId = taskId; this.stateDirectory = stateDirectory; - this.baseDir = stateDirectory.directoryForTask(taskId); + this.logPrefix = String.format("task [%s]", taskId); this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); @@ -96,10 +96,18 @@ public ProcessorStateManager(final TaskId taskId, this.restoreCallbacks = isStandby ? new HashMap() : null; this.storeToChangelogTopic = storeToChangelogTopic; - this.logPrefix = String.format("task [%s]", taskId); - if (!stateDirectory.lock(taskId, 5)) { - throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); + throw new LockException(String.format("%s Failed to lock the state directory for task %s", + logPrefix, taskId)); + } + // get a handle on the parent/base directory of the task directory + // note that the parent directory could have been accidentally deleted here, + // so catch that exception if that is the case + try { + this.baseDir = stateDirectory.directoryForTask(taskId); + } catch (ProcessorStateException e) { + throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", + logPrefix, taskId, e)); } // load the checkpoint information diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index d264b2606172c..b081e277d036a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -95,11 +95,18 @@ public File globalStateDir() { * @throws IOException */ public boolean lock(final TaskId taskId, int retry) throws IOException { + final File lockFile; // we already have the lock so bail out here if (locks.containsKey(taskId)) { return true; } - final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + try { + lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + } catch (ProcessorStateException e) { + // directoryForTask could be throwing an exception if another thread + // has concurrently deleted the directory + return false; + } final FileChannel channel; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index fb55796a45558..6b1d0773928af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -32,6 +33,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +99,22 @@ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception { assertTrue(directory.lock(taskId, 0)); } + @Test(expected = ProcessorStateException.class) + public void shouldThrowProcessorStateException() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + directory.directoryForTask(taskId); + } + + @Test + public void shouldNotLockDeletedDirectory() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + assertFalse(directory.lock(taskId, 0)); + } + @Test public void shouldLockMulitpleTaskDirectories() throws Exception {