Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package com.baidu.hugegraph.task;

import java.util.Date;
import java.util.Set;
import java.util.concurrent.Callable;

import org.apache.tinkerpop.gremlin.structure.Transaction;
Expand All @@ -30,13 +31,29 @@
import com.baidu.hugegraph.HugeGraphParams;
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.Log;
import com.google.common.collect.ImmutableSet;

public abstract class TaskCallable<V> implements Callable<V> {

private static final Logger LOG = Log.logger(HugeTask.class);

private static final String ERROR_MAX_LEN = "Failed to commit changes: " +
"The max length of bytes is";
private static final String ERROR_COMMIT = "Failed to commit changes: ";
private static final Set<String> ERROR_MESSAGES = ImmutableSet.of(
/*
* "The max length of bytes is" exception message occurs when
* task input size exceeds TASK_INPUT_SIZE_LIMIT or task result size
* exceeds TASK_RESULT_SIZE_LIMIT
*/
"The max length of bytes is",
/*
* "Batch too large" exception message occurs when using
* cassandra store and task input size is in
* [batch_size_fail_threshold_in_kb, TASK_INPUT_SIZE_LIMIT) or
* task result size is in
* [batch_size_fail_threshold_in_kb, TASK_RESULT_SIZE_LIMIT)
*/
"Batch too large"
);

private HugeTask<V> task = null;
private HugeGraph graph = null;
Expand Down Expand Up @@ -98,7 +115,8 @@ protected void save() {
*/
LOG.error("Failed to save task with error \"{}\": {}",
e, task.asMap(false));
if (e.getMessage().contains(ERROR_MAX_LEN)) {
String message = e.getMessage();
if (message.contains(ERROR_COMMIT) && needSaveWithEx(message)) {
task.failSave(e);
this.graph().taskScheduler().save(task);
return;
Expand Down Expand Up @@ -138,6 +156,15 @@ public static <V> TaskCallable<V> fromClass(String className) {
}
}

private static boolean needSaveWithEx(String message) {
for (String error : ERROR_MESSAGES) {
if (message.contains(error)) {
return true;
}
}
return false;
}

public static <V> TaskCallable<V> empty(Exception e) {
return new TaskCallable<V>() {
@Override
Expand Down