Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ public File directoryForTask(final TaskId taskId) {
return taskDir;
}

File stateDir() {
return stateDir;
}

/**
* Get or create the directory for the global stores.
* @return directory for the global stores
Expand Down Expand Up @@ -141,7 +145,7 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
}

try {
lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
lockFile = new File(stateDir, taskId + LOCK_FILE_NAME);
} catch (final ProcessorStateException e) {
// directoryForTask could be throwing an exception if another thread
// has concurrently deleted the directory
Expand Down Expand Up @@ -222,6 +226,10 @@ synchronized void unlock(final TaskId taskId) throws IOException {
final FileChannel fileChannel = channels.remove(taskId);
if (fileChannel != null) {
fileChannel.close();
final File lockFile = new File(stateDir, taskId + LOCK_FILE_NAME);
if (!lockFile.delete()) {
log.debug("{} was not deleted", lockFile.toString());
}
}
}
}
Expand Down Expand Up @@ -331,7 +339,8 @@ public boolean accept(final File pathname) {
private FileChannel getOrCreateFileChannel(final TaskId taskId,
final Path lockPath) throws IOException {
if (!channels.containsKey(taskId)) {
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE));
channels.put(taskId, FileChannel.open(lockPath, StandardOpenOption.CREATE,
StandardOpenOption.WRITE));
}
return channels.get(taskId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,12 @@ public void shouldCreateTaskStateDirectory() {
@Test
public void shouldLockTaskStateDirectory() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File taskDirectory = directory.directoryForTask(taskId);

directory.lock(taskId);

try (
final FileChannel channel = FileChannel.open(
new File(taskDirectory, StateDirectory.LOCK_FILE_NAME).toPath(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we remove this test? Seems, we should update the FileChannel here to use the new lock file name?

new File(appDir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE, StandardOpenOption.WRITE)
) {
channel.tryLock();
Expand Down Expand Up @@ -140,17 +139,18 @@ public void shouldNotLockDeletedDirectory() throws IOException {
@Test
public void shouldLockMulitpleTaskDirectories() throws IOException {
final TaskId taskId = new TaskId(0, 0);
final File task1Dir = directory.directoryForTask(taskId);
final File task1Dir = directory.stateDir();
final TaskId taskId2 = new TaskId(1, 0);
final File task2Dir = directory.directoryForTask(taskId2);
final File task2Dir = directory.stateDir();


try (
final FileChannel channel1 = FileChannel.open(
new File(task1Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
new File(task1Dir, taskId + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE);
final FileChannel channel2 = FileChannel.open(new File(task2Dir, StateDirectory.LOCK_FILE_NAME).toPath(),
final FileChannel channel2 = FileChannel.open(
new File(task2Dir, taskId2 + StateDirectory.LOCK_FILE_NAME).toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE)
) {
Expand Down Expand Up @@ -201,10 +201,9 @@ public void shouldCleanUpTaskStateDirectoriesThatAreNotCurrentlyLocked() throws
time.sleep(1000);
directory.cleanRemovedTasks(0);

files = Arrays.asList(appDir.listFiles());
files = Arrays.asList(directory.stateDir().listFiles());
// lock files have been cleaned
assertEquals(2, files.size());
assertTrue(files.contains(new File(appDir, task0.toString())));
assertTrue(files.contains(new File(appDir, task1.toString())));
} finally {
directory.unlock(task0);
directory.unlock(task1);
Expand Down Expand Up @@ -358,4 +357,4 @@ public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() {
files = Arrays.asList(appDir.listFiles());
assertEquals(0, files.size());
}
}
}