-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I had searched in the issues and found no similar issues.
Version
2.1.8
What's Wrong?
We suddenly encountered an issue where all export jobs could not be created , and even after waiting for a day, the problem still persists.
Therefore, i traced the program.
When the export publishes a task, it will call the following function tryPublishTask.
public Long addMemoryTask(TransientTaskExecutor executor) throws JobException {
Long taskId = executor.getId();
taskExecutorMap.put(taskId, executor);
disruptor.tryPublishTask(taskId);
LOG.info("add memory task, taskId: {}", taskId);
return taskId;
}
.........
public void tryPublishTask(Long taskId) throws JobException {
if (isClosed) {
log.info("tryPublish failed, disruptor is closed, taskId: {}", taskId);
return;
}
// We reserve two slots in the ring buffer
// to prevent it from becoming stuck due to competition between producers and consumers.
if (disruptor.getRingBuffer().hasAvailableCapacity(2)) {
disruptor.publishEvent(TRANSLATOR, taskId, 0L, TaskType.TRANSIENT_TASK);
} else {
throw new JobException("There is not enough available capacity in the RingBuffer.");
}
}
The above disruptor.getRingBuffer().hasAvailableCapacity(2) always returns false. And taskExecutorMap is empty.
I traced the hasAvailableCapacity function and found that there might be a boundary bug in Disruptor.
private boolean hasAvailableCapacity(int requiredCapacity, boolean doStore) {
// nextValue = 3510
long nextValue = this.nextValue;
// wrapPoint = 3510 + 2 - 1024 = 2488
long wrapPoint = nextValue + (long) requiredCapacity - (long) this.bufferSize;
// cachedValue = 2487
long cachedGatingSequence = this.cachedValue;
if (wrapPoint > cachedGatingSequence || cachedGatingSequence > nextValue) {
if (doStore) {
this.cursor.setVolatile(nextValue);
}
// minSequence = 2487
long minSequence = Util.getMinimumSequence(this.gatingSequences, nextValue);
this.cachedValue = minSequence;
// 2488 > 2487, return false
if (wrapPoint > minSequence) {
return false;
}
}
return true;
}
It's a bit unusual that there is a no contiguous minimum value of 2487 in gatingSequences.
You can read the runtime values of the variables in the attachment.
0521 - stack.log
In addition to the above situations ......
If tasks are continuously added when the queue is full, it may cause a null pointer exception (NPE). However, we found that this NPE issue has already been fixed in this PR.
Caused by: java.lang.NullPointerException
at org.apache.doris.scheduler.manager.TransientTaskManager.cancelMemoryTask(TransientTaskManager.java:67) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.load.ExportJob.lambda$cancelExportTask$5(ExportJob.java:694) ~[doris-fe.jar:1.2-SNAPSHOT]
at java.util.ArrayList.forEach(ArrayList.java:1259) ~[?:1.8.0_352]
at org.apache.doris.load.ExportJob.cancelExportTask(ExportJob.java:692) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.load.ExportJob.updateExportJobState(ExportJob.java:668) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.load.ExportMgr.addExportJobAndRegisterTask(ExportMgr.java:127) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.nereids.trees.plans.commands.ExportCommand.run(ExportCommand.java:151) ~[doris-fe.jar:1.2-SNAPSHOT]
at org.apache.doris.qe.StmtExecutor.executeByNereids(StmtExecutor.java:729) ~[doris-fe.jar:1.2-SNAPSHOT]
What You Expected?
Given that the current Disruptor implementation is overly complex and makes bug tracking challenging, and considering that this use case only involves a basic producer-consumer scenario, I recommend refactoring the logic to ensure the reliability of queue production and consumption.
How to Reproduce?
it only occurs occasionally online, i am not sure how to reproduce it.
Anything Else?
restart FE can solve the problem.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
