Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1a15f53
udf: replace function
Aug 13, 2020
7d072b9
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 14, 2020
391158f
udf: replace function
Aug 14, 2020
5eb52e1
udf: replace function
Aug 14, 2020
a79ea91
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 18, 2020
86f841f
udf: replace function
Aug 18, 2020
ade1afa
udf: replace function
Aug 18, 2020
1d95a49
udf: replace function
Aug 18, 2020
433eb87
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 24, 2020
c62d239
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
0a8db8c
Merge remote-tracking branch 'upstream/master' into str_replace
Aug 25, 2020
02d3f86
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 1, 2020
41da0ba
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 8, 2020
de4e523
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 11, 2020
a863b0d
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 17, 2020
024c422
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 22, 2020
e1656c7
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 23, 2020
837e037
Merge remote-tracking branch 'upstream/master' into str_replace
Sep 27, 2020
a36d3e7
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 17, 2020
43cfa2b
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 23, 2020
641efb5
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 24, 2020
5066131
Merge remote-tracking branch 'upstream/master' into str_replace
Oct 26, 2020
c1e58d5
unused importe
Oct 26, 2020
e4405e4
cat exception
Oct 26, 2020
9145509
update
Oct 26, 2020
42c7c6d
update
Oct 26, 2020
f198949
Merge remote-tracking branch 'upstream/master' into catch_retry_submi…
Oct 27, 2020
a1a94cc
update
Oct 27, 2020
6f8e407
update
Nov 5, 2020
6fbf63c
update
Nov 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.RejectedExecutionException;

/**
* There are 3 steps in BrokerLoadJob: BrokerPendingTask, LoadLoadingTask, CommitAndPublishTxn.
Expand Down Expand Up @@ -155,6 +156,13 @@ private void onPendingTaskFinished(BrokerPendingTaskAttachment attachment) {
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
return;
} catch (RejectedExecutionException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("database_id", dbId)
.add("error_msg", "the task queque is full.")
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_RUN_FAIL, e.getMessage()), true, true);
return;
}

loadStartTimestamp = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;

/**
* parent class of BrokerLoadJob and SparkLoadJob from load stmt
Expand Down Expand Up @@ -233,8 +234,13 @@ public void onTaskFailed(long taskId, FailMsg failMsg) {
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
// load id will be added to loadStatistic when executing this task
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
return;
try {
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
} catch (RejectedExecutionException e) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;
}
}
} finally {
writeUnlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

/**
* LoadScheduler will schedule the pending LoadJob which belongs to LoadManager.
Expand Down Expand Up @@ -99,6 +100,12 @@ private void process() throws InterruptedException {
.build(), e);
needScheduleJobs.put(loadJob);
return;
} catch (RejectedExecutionException e) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, loadJob.getId())
.add("error_msg", "Failed to submit etl job. Job queue is full.")
.build(), e);
loadJob.cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.ETL_SUBMIT_FAIL, e.getMessage()),
true, true);
}
}
}
Expand Down