From b484904a74458dd20d3b7b0deb26f822feca9140 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 11 Apr 2017 12:49:28 -0700 Subject: [PATCH 1/4] Catch exception --- .../streams/processor/internals/StateDirectory.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index d264b2606172c..b081e277d036a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -95,11 +95,18 @@ public File globalStateDir() { * @throws IOException */ public boolean lock(final TaskId taskId, int retry) throws IOException { + final File lockFile; // we already have the lock so bail out here if (locks.containsKey(taskId)) { return true; } - final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + try { + lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + } catch (ProcessorStateException e) { + // directoryForTask could be throwing an exception if another thread + // has concurrently deleted the directory + return false; + } final FileChannel channel; From 21af5cc50b1a7e18230b6ff817997a1e82c71c97 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Tue, 11 Apr 2017 13:43:42 -0700 Subject: [PATCH 2/4] Unit test --- .../internals/StateDirectoryTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index fb55796a45558..6b1d0773928af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -32,6 +33,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +99,22 @@ public void shouldBeTrueIfAlreadyHoldsLock() throws Exception { assertTrue(directory.lock(taskId, 0)); } + @Test(expected = ProcessorStateException.class) + public void shouldThrowProcessorStateException() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + directory.directoryForTask(taskId); + } + + @Test + public void shouldNotLockDeletedDirectory() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + assertFalse(directory.lock(taskId, 0)); + } + @Test public void shouldLockMulitpleTaskDirectories() throws Exception { From bcb7b8f20ef74eb4410bf402837fcf90e42aea19 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Wed, 12 Apr 2017 04:08:52 -0700 Subject: [PATCH 3/4] Ismael's comment --- .../processor/internals/ProcessorStateManager.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 65831a24d7183..9cf6088e0a4c4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -82,7 +82,13 @@ public ProcessorStateManager(final TaskId taskId, final Map storeToChangelogTopic) throws LockException, IOException { this.taskId = taskId; this.stateDirectory = stateDirectory; - this.baseDir = stateDirectory.directoryForTask(taskId); + this.logPrefix = String.format("task [%s]", taskId); + try { + this.baseDir = stateDirectory.directoryForTask(taskId); + } catch (ProcessorStateException e) { + throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", + logPrefix, taskId, e)); + } this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); @@ -96,7 +102,7 @@ public ProcessorStateManager(final TaskId taskId, this.restoreCallbacks = isStandby ? new HashMap() : null; this.storeToChangelogTopic = storeToChangelogTopic; - this.logPrefix = String.format("task [%s]", taskId); + if (!stateDirectory.lock(taskId, 5)) { throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); From 9f7a7dd0b284ec1e992bc65f42eb6b7d2b927af1 Mon Sep 17 00:00:00 2001 From: Eno Thereska Date: Wed, 12 Apr 2017 04:29:57 -0700 Subject: [PATCH 4/4] Ismael's comments --- .../internals/ProcessorStateManager.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 9cf6088e0a4c4..2ef9634218859 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -83,12 +83,6 @@ public ProcessorStateManager(final TaskId taskId, this.taskId = taskId; this.stateDirectory = stateDirectory; this.logPrefix = String.format("task [%s]", taskId); - try { - this.baseDir = stateDirectory.directoryForTask(taskId); - } catch (ProcessorStateException e) { - throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", - logPrefix, taskId, e)); - } this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); @@ -102,10 +96,18 @@ public ProcessorStateManager(final TaskId taskId, this.restoreCallbacks = isStandby ? new HashMap() : null; this.storeToChangelogTopic = storeToChangelogTopic; - - if (!stateDirectory.lock(taskId, 5)) { - throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); + throw new LockException(String.format("%s Failed to lock the state directory for task %s", + logPrefix, taskId)); + } + // get a handle on the parent/base directory of the task directory + // note that the parent directory could have been accidentally deleted here, + // so catch that exception if that is the case + try { + this.baseDir = stateDirectory.directoryForTask(taskId); + } catch (ProcessorStateException e) { + throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", + logPrefix, taskId, e)); } // load the checkpoint information