Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,35 +101,29 @@ void execute(final String key, Runnable task) {

protected abstract void execute(String key, Deque<Runnable> finalTasks);

/** Cancels every task in the queue assoicated with {@code key}. */
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
final Deque<Runnable> tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
while (!tasks.isEmpty()) {
Runnable task = tasks.poll();
if (task instanceof CancellableRunnable) {
((CancellableRunnable) task).cancel(e);
} else {
logger.log(
Level.WARNING,
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
}
}
}
}

protected void invokeCallback(final Deque<Runnable> tasks) {
// TODO(kimkyung-goog): Check if there is a race when task list becomes empty.
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
}
}

private static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, finalTasks);
}
});
}

protected void invokeCallbackAndExecuteNext(final String key, final Deque<Runnable> tasks) {
invokeCallback(tasks);
Expand All @@ -153,22 +147,6 @@ public void run() {
}
}

private static class AutoExecutor extends SequentialExecutor {
AutoExecutor(Executor executor) {
super(executor);
}

protected void execute(final String key, final Deque<Runnable> finalTasks) {
executor.execute(
new Runnable() {
@Override
public void run() {
invokeCallbackAndExecuteNext(key, finalTasks);
}
});
}
}

private static class CallbackExecutor extends SequentialExecutor {
CallbackExecutor(Executor executor) {
super(executor);
Expand Down Expand Up @@ -255,4 +233,26 @@ public void run() {
});
}
}

/** Cancels every task in the queue assoicated with {@code key}. */
void cancelQueuedTasks(final String key, Throwable e) {
// TODO(kimkyung-goog): Ensure execute() fails once cancelQueueTasks() has been ever invoked,
// so that no more tasks are scheduled.
synchronized (tasksByKey) {
final Deque<Runnable> tasks = tasksByKey.get(key);
if (tasks == null) {
return;
}
while (!tasks.isEmpty()) {
Runnable task = tasks.poll();
if (task instanceof CancellableRunnable) {
((CancellableRunnable) task).cancel(e);
} else {
logger.log(
Level.WARNING,
"Attempted to cancel Runnable that was not CancellableRunnable; ignored.");
}
}
}
}
}