Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion docs/api-reference/sql-ingestion-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,16 @@ The response shows an example report for a query.
"status": "SUCCESS",
"startTime": "2022-09-14T22:12:09.266Z",
"durationMs": 28227,
"workers": {
"0": [
{
"workerId": "query-3dc0c45d-34d7-4b15-86c9-cdb2d3ebfc4e-worker0_0",
"state": "SUCCESS",
"durationMs": 15511,
"pendingMs": 137
}
]
},
"pendingTasks": 0,
"runningTasks": 2,
"segmentLoadStatus": {
Expand Down Expand Up @@ -607,7 +617,8 @@ The following table describes the response fields when you retrieve a report for
| `multiStageQuery.payload.status.workers.<workerNumber>` | Array of worker tasks including retries. |
| `multiStageQuery.payload.status.workers.<workerNumber>[].workerId` | Id of the worker task.| |
| `multiStageQuery.payload.status.workers.<workerNumber>[].status` | RUNNING, SUCCESS, or FAILED.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed after the worker task started running. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].durationMs` | Milliseconds elapsed between when the worker task was first requested and when it finished. It is -1 for worker tasks with status RUNNING.|
| `multiStageQuery.payload.status.workers.<workerNumber>[].pendingMs` | Milliseconds elapsed between when the worker task was first requested and when it fully started RUNNING. Actual work time can be calculated using `actualWorkTimeMS = durationMs - pendingMs`.|
| `multiStageQuery.payload.status.pendingTasks` | Number of tasks that are not fully started. -1 denotes that the number is currently unknown. |
| `multiStageQuery.payload.status.runningTasks` | Number of currently running tasks. Should be at least 1 since the controller is included. |
| `multiStageQuery.payload.status.segmentLoadStatus` | Segment loading container. Only present after the segments have been published. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ public void reportFailedInactiveWorker(int workerNumber)
* Blocks the call untill the worker tasks are ready to be contacted for work.
*
* @param workerSet
*
* @throws InterruptedException
*/
public void waitUntilWorkersReady(Set<Integer> workerSet) throws InterruptedException
Expand Down Expand Up @@ -353,45 +352,6 @@ public boolean isTaskLatest(String taskId)
}
}

public static class WorkerStats
{
String workerId;
TaskState state;
long duration;

/**
* For JSON deserialization only
*/
public WorkerStats()
{
}

public WorkerStats(String workerId, TaskState state, long duration)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
}

@JsonProperty
public String getWorkerId()
{
return workerId;
}

@JsonProperty
public TaskState getState()
{
return state;
}

@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}
}

public Map<Integer, List<WorkerStats>> getWorkerStats()
{
final Map<Integer, List<WorkerStats>> workerStats = new TreeMap<>();
Expand All @@ -400,14 +360,17 @@ public Map<Integer, List<WorkerStats>> getWorkerStats()

TaskTracker taskTracker = taskEntry.getValue();

TaskStatus taskStatus = taskTracker.statusRef.get();
workerStats.computeIfAbsent(taskTracker.workerNumber, k -> new ArrayList<>())
.add(new WorkerStats(taskEntry.getKey(),
taskTracker.status.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskTracker.status.getDuration()
.add(new WorkerStats(
taskEntry.getKey(),
taskStatus.getStatusCode(),
// getDuration() returns -1 for running tasks.
// It's not calculated on-the-fly here since
// taskTracker.startTimeMillis marks task
// submission time rather than the actual start.
taskStatus.getDuration(),
taskTracker.taskPendingTimeInMs()
));
}

Expand Down Expand Up @@ -576,18 +539,20 @@ private void updateTaskTrackersAndTaskIds()
for (Map.Entry<String, TaskStatus> statusEntry : statuses.entrySet()) {
final String taskId = statusEntry.getKey();
final TaskTracker tracker = taskTrackers.get(taskId);
tracker.status = statusEntry.getValue();
tracker.updateStatus(statusEntry.getValue());
TaskStatus status = tracker.statusRef.get();

if (!tracker.status.getStatusCode().isComplete() && tracker.unknownLocation()) {
if (!status.getStatusCode().isComplete() && tracker.unknownLocation()) {
// Look up location if not known. Note: this location is not used to actually contact the task. For that,
// we have SpecificTaskServiceLocator. This location is only used to determine if a task has started up.
tracker.initialLocation = workerManager.location(taskId);
tracker.setLocation(workerManager.location(taskId));
}

if (tracker.status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) {
if (status.getStatusCode() == TaskState.RUNNING && !tracker.unknownLocation()) {
synchronized (taskIds) {
if (fullyStartedTasks.add(tracker.workerNumber)) {
recentFullyStartedWorkerTimeInMillis.set(System.currentTimeMillis());
tracker.setFullyStartedTime(System.currentTimeMillis());
}
taskIds.notifyAll();
}
Expand Down Expand Up @@ -616,7 +581,7 @@ private void checkForErroneousTasks()
continue;
}

if (tracker.status == null) {
if (tracker.statusRef.get() == null) {
removeWorkerFromFullyStartedWorkers(tracker);
final String errorMessage = StringUtils.format("Task [%s] status missing", taskId);
log.info(errorMessage + ". Trying to relaunch the worker");
Expand All @@ -635,9 +600,10 @@ private void checkForErroneousTasks()
));
} else if (tracker.didFail() && !canceledWorkerTasks.contains(taskId)) {
removeWorkerFromFullyStartedWorkers(tracker);
log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, tracker.status.getErrorMsg());
TaskStatus taskStatus = tracker.statusRef.get();
log.info("Task[%s] failed because %s. Trying to relaunch the worker", taskId, taskStatus.getErrorMsg());
tracker.enableRetrying();
retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, tracker.status.getErrorMsg()));
retryTask.retry(tracker.msqWorkerTask, new WorkerFailedFault(taskId, taskStatus.getErrorMsg()));
}
}
}
Expand Down Expand Up @@ -717,15 +683,15 @@ private void checkRelaunchLimitsOrThrow(TaskTracker tracker, MSQWorkerTask relau
Limits.PER_WORKER_RELAUNCH_LIMIT,
relaunchTask.getId(),
relaunchTask.getWorkerNumber(),
tracker.status.getErrorMsg()
tracker.statusRef.get().getErrorMsg()
));
}
if (currentRelaunchCount > Limits.TOTAL_RELAUNCH_LIMIT) {
throw new MSQException(new TooManyAttemptsForJob(
Limits.TOTAL_RELAUNCH_LIMIT,
currentRelaunchCount,
relaunchTask.getId(),
tracker.status.getErrorMsg()
tracker.statusRef.get().getErrorMsg()
));
}
}
Expand All @@ -737,8 +703,9 @@ private void shutDownTasks()
for (final Map.Entry<String, TaskTracker> taskEntry : taskTrackers.entrySet()) {
final String taskId = taskEntry.getKey();
final TaskTracker tracker = taskEntry.getValue();
if (!canceledWorkerTasks.contains(taskId)
&& (tracker.status == null || !tracker.status.getStatusCode().isComplete())) {
if ((!canceledWorkerTasks.contains(taskId))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this looks a bit weird due to the parenthesis and new lines.

Suggested change
if ((!canceledWorkerTasks.contains(taskId))
if (!canceledWorkerTasks.contains(taskId) && !tracker.isComplete()) {

Copy link
Copy Markdown
Contributor Author

@cryptoe cryptoe Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find 2 not statements a little tricky so hence like the parenthesis. Keeping it as is for now.

&&
(!tracker.isComplete())) {
canceledWorkerTasks.add(taskId);
context.workerManager().cancel(taskId);
}
Expand Down Expand Up @@ -831,11 +798,12 @@ private class TaskTracker
{
private final int workerNumber;
private final long startTimeMillis = System.currentTimeMillis();
private final AtomicLong taskFullyStartedTimeRef = new AtomicLong();
private final MSQWorkerTask msqWorkerTask;
private TaskStatus status;
private TaskLocation initialLocation;
private final AtomicReference<TaskStatus> statusRef = new AtomicReference<>();
private final AtomicReference<TaskLocation> initialLocationRef = new AtomicReference<>();

private boolean isRetrying = false;
private final AtomicBoolean isRetryingRef = new AtomicBoolean(false);

public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask)
{
Expand All @@ -845,16 +813,19 @@ public TaskTracker(int workerNumber, MSQWorkerTask msqWorkerTask)

public boolean unknownLocation()
{
TaskLocation initialLocation = initialLocationRef.get();
return initialLocation == null || TaskLocation.unknown().equals(initialLocation);
}

public boolean isComplete()
{
TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isComplete();
}

public boolean didFail()
{
TaskStatus status = statusRef.get();
return status != null && status.getStatusCode().isFailure();
}

Expand All @@ -869,6 +840,7 @@ public boolean didFail()
public boolean didRunTimeOut(final long maxTaskStartDelayMillis)
{
long currentTimeMillis = System.currentTimeMillis();
TaskStatus status = statusRef.get();
return (status == null || status.getStatusCode() == TaskState.RUNNING)
&& unknownLocation()
&& currentTimeMillis - recentFullyStartedWorkerTimeInMillis.get() > maxTaskStartDelayMillis
Expand All @@ -880,17 +852,87 @@ && unknownLocation()
*/
public void enableRetrying()
{
isRetrying = true;
isRetryingRef.set(true);
}

/**
* Checks is the task is retrying,
*
* @return
*/
public boolean isRetrying()
{
return isRetrying;
return isRetryingRef.get();
}

public void setLocation(TaskLocation taskLocation)
{
initialLocationRef.set(taskLocation);
}

public void updateStatus(TaskStatus taskStatus)
{
statusRef.set(taskStatus);
}

public void setFullyStartedTime(long currentTimeMillis)
{
taskFullyStartedTimeRef.set(currentTimeMillis);
}

public long taskPendingTimeInMs()
{
long currentFullyStartingTime = taskFullyStartedTimeRef.get();
if (currentFullyStartingTime == 0) {
return System.currentTimeMillis() - startTimeMillis;
} else {
return Math.max(0, currentFullyStartingTime - startTimeMillis);
}
}
}

public static class WorkerStats
{
String workerId;
TaskState state;
long duration;
long pendingTimeInMs;

/**
* For JSON deserialization only
*/
public WorkerStats()
{
}

public WorkerStats(String workerId, TaskState state, long duration, long pendingTimeInMs)
{
this.workerId = workerId;
this.state = state;
this.duration = duration;
this.pendingTimeInMs = pendingTimeInMs;
}

@JsonProperty
public String getWorkerId()
{
return workerId;
}

@JsonProperty
public TaskState getState()
{
return state;
}

@JsonProperty("durationMs")
public long getDuration()
{
return duration;
}

@JsonProperty("pendingMs")
public long getPendingTimeInMs()
{
return pendingTimeInMs;
}
}
}