Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,16 @@ private String logPrefix() {

/**
* Get the lock for the {@link TaskId}s directory if it is available
* @param taskId
* @param taskId the id of the task to get a lock on
* @return true if successful
* @throws IOException
* @throws IOException if lock could not be aquired
*/
synchronized boolean lock(final TaskId taskId) throws IOException {
return lock(taskId, false);
}

private synchronized boolean lock(final TaskId taskId,
final boolean throwOverlappingFileLockException) throws IOException {
if (!createStateDirectory) {
return true;
}
Expand Down Expand Up @@ -164,7 +169,12 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
return false;
}

final FileLock lock = tryLock(channel);
final FileLock lock;
if (throwOverlappingFileLockException) {
lock = channel.tryLock();
} else {
lock = tryLock(channel);
}
if (lock != null) {
locks.put(taskId, new LockAndOwner(Thread.currentThread().getName(), lock));

Expand All @@ -174,10 +184,13 @@ synchronized boolean lock(final TaskId taskId) throws IOException {
}

synchronized boolean lockGlobalState() throws IOException {
return lockGlobalState(false);
}

private synchronized boolean lockGlobalState(final boolean throwOverlappingFileLockException) throws IOException {
if (!createStateDirectory) {
return true;
}

if (globalStateLock != null) {
log.trace("{} Found cached state dir lock for the global task", logPrefix());
return true;
Expand All @@ -193,7 +206,12 @@ synchronized boolean lockGlobalState() throws IOException {
// file, in this case we will return immediately indicating locking failed.
return false;
}
final FileLock fileLock = tryLock(channel);
final FileLock fileLock;
if (throwOverlappingFileLockException) {
fileLock = channel.tryLock();
} else {
fileLock = tryLock(channel);
}
if (fileLock == null) {
channel.close();
return false;
Expand Down Expand Up @@ -235,20 +253,32 @@ synchronized void unlock(final TaskId taskId) throws IOException {
}
}

@SuppressWarnings("ThrowFromFinallyBlock")
public synchronized void clean() {
try {
cleanRemovedTasks(0, true);
} catch (final Exception e) {
// this is already logged within cleanRemovedTasks
throw new StreamsException(e);
}

try {
if (stateDir.exists()) {
if (stateDir.exists() && lockGlobalState(true)) {
Utils.delete(globalStateDir().getAbsoluteFile());
}
} catch (final OverlappingFileLockException e) {
log.error("{} Failed to get the global state directory lock.", logPrefix(), e);
throw new StreamsException(e);
} catch (final IOException e) {
log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e);
throw new StreamsException(e);
} finally {
try {
unlockGlobalState();
} catch (final IOException e) {
log.error("{} Failed to release global state directory lock.", logPrefix());
throw new StreamsException(e);
}
}
}

Expand All @@ -267,6 +297,7 @@ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
}
}

@SuppressWarnings("ThrowFromFinallyBlock")
private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
final boolean manualUserCall) throws Exception {
final File[] taskDirs = listTaskDirectories();
Expand All @@ -279,7 +310,7 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
final TaskId id = TaskId.parse(dirName);
if (!locks.containsKey(id)) {
try {
if (lock(id)) {
if (lock(id, manualUserCall)) {
final long now = time.milliseconds();
final long lastModifiedMs = taskDir.lastModified();
if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) {
Expand All @@ -302,11 +333,8 @@ private synchronized void cleanRemovedTasks(final long cleanupDelayMs,
}
}
} catch (final OverlappingFileLockException e) {
// locked by another thread
if (manualUserCall) {
log.error("{} Failed to get the state directory lock.", logPrefix(), e);
throw e;
}
log.error("{} Failed to get the state directory lock.", logPrefix(), e);
throw e;
} catch (final IOException e) {
log.error("{} Failed to delete the state directory.", logPrefix(), e);
if (manualUserCall) {
Expand Down
Loading