-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Fix bugs of Broker load #1546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix bugs of Broker load #1546
Conversation
When broker load task failed and try, the load id should be changed, or the retry task will use same tablets channel on BE, which may cause some problem. And furthermore, I use the same TUniqueId for the query id and load id of a load plan, for tracing the load process more easily. Also add a BE config 'tablet_writer_rpc_timeout_sec' to config the timeout of add_batch rpc in load process.
be/src/exec/tablet_sink.h
Outdated
| // BE id -> execution time of add batch in us | ||
| std::unordered_map<int64_t, int64_t> _node_add_batch_time_map; | ||
| std::unordered_map<int64_t, int64_t> _node_add_batch_wait_time_map; | ||
| std::unordered_map<int64_t, int64_t> _node_add_batch_num_map; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use a struct to enclose these
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
| executeAfterAborted(txnState); | ||
| // cancel load job | ||
| executeCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false); | ||
| unprotectedExecuteCancel(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, txnStatusChangeReason), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clear idToTask
| olapTableSink.updateLoadId(loadId); | ||
| } | ||
|
|
||
| LOG.info("update olap table sink's load id to {}, job: {}", DebugUtil.printId(loadId), loadJobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this debug log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link loadId and jobId is necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this debug log
No, this is not a debug log. We can not find the new load id without this bug
| totalFileNum += fileStatuses.size(); | ||
| LOG.info("get {} files to in file group {}. size: {}. job: {}", | ||
| fileStatuses.size(), groupNum, groupFileSize, callback.getCallbackId()); | ||
| groupNum++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The number of group is not meaningful. Maybe id of table is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will add a table id here
| return gson.toJson(partitionIdToOffset); | ||
| } | ||
|
|
||
| private TExecPlanFragmentParams updateTExecPlanFragmentParams(RoutineLoadJob routineLoadJob) throws UserException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The better name of method is replan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change it to rePlan()
| olapTableSink.updateLoadId(loadId); | ||
| } | ||
|
|
||
| LOG.info("update olap table sink's load id to {}, job: {}", DebugUtil.printId(loadId), loadJobId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Link loadId and jobId is necessary.
be/test/exec/tablet_sink_test.cpp
Outdated
| if (!doris::config::init(conffile.c_str(), false)) { | ||
| fprintf(stderr, "error read config file. \n"); | ||
| return -1; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can set config value in SetUp(), otherwise you can only run this test in run-ut.sh
Use same UUID as query ID and load ID of a load execution plan. Each load execution plan has a load ID, and as a plan, there is also a query ID. We can use same UUID as query ID and load ID, for tracing the load process more easily. Change the load ID when retrying a load execution plan. When a load execution plan retry, the load ID should be changed, otherwise BE can not distinguish the old and new load requests. Cancel the running loading task when cancelling the broker load. When user cancel a broker load, the running loading task should also be cancelled, or it may occupies the worker thread for a long time. Remove the unnecessary query report when doing load execution plan. Only the last query report is needed. Add a new BE config tablet_writer_rpc_timeout_sec. It is used for RPC of tablet sink. The default is 600 seconds. which is long enough for flushing about 6GB data. The long timeout config will reduce the possibility of encountering fail to send batch error when loading. Use streaming_load_max_mb instead of mini_load_max_mb in BE config. Add more logs for tracing a broker load process easily.
Co-authored-by: morningman <morningman@163.com>
Use same UUID as query ID and load ID of a load execution plan.
Each load execution plan has a load ID, and as a plan, there is also a query ID.
We can use same UUID as query ID and load ID, for tracing the load process more easily.
Change the load ID when retrying a load execution plan.
When a load execution plan retry, the load ID should be changed, otherwise BE can not
distinguish the old and new load requests.
Cancel the running loading task when cancelling the broker load.
When user cancel a broker load, the running loading task should also be cancelled, or
it may occupies the worker thread for a long time.
Remove the unnecessary query report when doing load execution plan.
Only the last query report is needed.
Add a new BE config
tablet_writer_rpc_timeout_sec.It is used for RPC of tablet sink. The default is 600 seconds. which is long enough for flushing
about 6GB data. The long timeout config will reduce the possibility of encountering
fail to send batcherror when loading.Use
streaming_load_max_mbinstead ofmini_load_max_mbin BE config.Add more logs for tracing a broker load process easily.