Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ private boolean processBatch(AllocateRequestBatch requestBatch)
}

log.debug(
"Processing [%d] requests for batch [%s], queue time [%s].",
"Processing [%d] requests for batch[%s], queue time[%s].",
requestBatch.size(), requestKey, requestBatch.getQueueTime()
);

Expand All @@ -355,21 +355,21 @@ private boolean processBatch(AllocateRequestBatch requestBatch)

emitBatchMetric("task/action/batch/attempts", 1L, requestKey);
emitBatchMetric("task/action/batch/runTime", (System.currentTimeMillis() - startTimeMillis), requestKey);
log.info("Successfully processed [%d / %d] requests in batch [%s].", successCount, batchSize, requestKey);
log.debug("Successfully processed [%d / %d] requests in batch[%s].", successCount, batchSize, requestKey);

if (requestBatch.isEmpty()) {
return true;
}

// Requeue the batch only if used segments have changed
log.debug("There are [%d] failed requests in batch [%s].", requestBatch.size(), requestKey);
log.debug("There are [%d] failed requests in batch[%s].", requestBatch.size(), requestKey);
final Set<DataSegment> updatedUsedSegments = retrieveUsedSegments(requestKey);

if (updatedUsedSegments.equals(usedSegments)) {
log.warn(
"Completing [%d] failed requests in batch [%s] with null value as there"
"Completing [%d] failed requests in batch[%s] with null value as there"
+ " are conflicting segments. Cannot retry allocation until the set of"
+ " used segments overlapping the allocation interval [%s] changes.",
+ " used segments overlapping the allocation interval[%s] changes.",
size(), requestKey, requestKey.preferredAllocationInterval
);

Expand Down Expand Up @@ -452,9 +452,9 @@ private int allocateSegmentsForBatch(AllocateRequestBatch requestBatch, Set<Data
}

if (!requestsWithPartialOverlap.isEmpty()) {
log.info(
"Found [%d] requests in batch [%s] with row intervals that partially overlap existing segments."
+ " These cannot be processed until the set of used segments changes. Example request: [%s]",
log.warn(
"Found [%d] requests in batch[%s] with row intervals that partially overlap existing segments."
+ " These cannot be processed until the set of used segments changes. Example request[%s]",
requestsWithPartialOverlap.size(), requestBatch.key, requestsWithPartialOverlap.get(0)
);
}
Expand Down Expand Up @@ -485,7 +485,7 @@ private int allocateSegmentsForInterval(

final AllocateRequestKey requestKey = requestBatch.key;
log.debug(
"Trying allocation for [%d] requests, interval [%s] in batch [%s]",
"Trying allocation for [%d] requests, interval[%s] in batch[%s]",
requests.size(), tryInterval, requestKey
);

Expand Down Expand Up @@ -531,14 +531,14 @@ private void emitTaskMetric(String metric, long value, SegmentAllocateRequest re
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, request.getTask());
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
metricBuilder.setDimension(DruidMetrics.TASK_ACTION_TYPE, SegmentAllocateAction.TYPE);
emitter.emit(metricBuilder.setMetric(metric, value));
}

private void emitBatchMetric(String metric, long value, AllocateRequestKey key)
{
final ServiceMetricEvent.Builder metricBuilder = ServiceMetricEvent.builder();
metricBuilder.setDimension("taskActionType", SegmentAllocateAction.TYPE);
metricBuilder.setDimension(DruidMetrics.TASK_ACTION_TYPE, SegmentAllocateAction.TYPE);
metricBuilder.setDimension(DruidMetrics.DATASOURCE, key.dataSource);
metricBuilder.setDimension(DruidMetrics.INTERVAL, key.preferredAllocationInterval.toString());
emitter.emit(metricBuilder.setMetric(metric, value));
Expand Down Expand Up @@ -651,15 +651,15 @@ void handleResult(SegmentAllocateResult result, SegmentAllocateRequest request)
emitTaskMetric("task/action/success/count", 1L, request);
requestToFuture.remove(request).complete(result.getSegmentId());
} else if (request.canRetry()) {
log.info(
"Allocation failed on attempt [%d] due to error [%s]. Can still retry action [%s].",
log.debug(
"Allocation failed on attempt [%d] due to error[%s]. Can still retry action[%s].",
request.getAttempts(), result.getErrorMessage(), request.getAction()
);
} else {
emitTaskMetric("task/action/failed/count", 1L, request);
log.error(
"Exhausted max attempts [%d] for allocation with latest error [%s]."
+ " Completing action [%s] with a null value.",
"Exhausted max attempts[%d] for allocation with latest error[%s]."
+ " Completing action[%s] with a null value.",
request.getAttempts(), result.getErrorMessage(), request.getAction()
);
requestToFuture.remove(request).complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ public List<SegmentAllocateResult> allocateSegments(
boolean reduceMetadataIO
)
{
log.info("Allocating [%d] segments for datasource [%s], interval [%s]", requests.size(), dataSource, interval);
log.debug("Allocating [%d] segments for datasource[%s], interval[%s]", requests.size(), dataSource, interval);
final boolean isTimeChunkLock = lockGranularity == LockGranularity.TIME_CHUNK;

final AllocationHolderList holderList = new AllocationHolderList(requests, interval);
Expand Down Expand Up @@ -1132,11 +1132,11 @@ private void unlock(final Task task, final Interval interval, @Nullable Integer

if (match) {
// Remove task from live list
log.info("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock);
log.debug("Removing task[%s] from TaskLock[%s]", task.getId(), taskLock);
final boolean removed = taskLockPosse.removeTask(task);

if (taskLockPosse.isTasksEmpty()) {
log.info("TaskLock[%s] is now empty.", taskLock);
log.debug("TaskLock[%s] is now empty.", taskLock);
possesHolder.remove(taskLockPosse);
}
if (possesHolder.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ private void manageInternalCritical(
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
log.info("Asking taskRunner to run task[%s]", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
// Task.isReady() can internally lock intervals or segments.
Expand Down Expand Up @@ -469,7 +469,7 @@ private void manageInternalPostCritical(
// Kill tasks that shouldn't be running
final Set<String> tasksToKill = Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
log.info("Asking taskRunner to clean up [%,d] tasks.", tasksToKill.size());

// On large installations running several thousands of tasks,
// concatenating the list of known task ids can be compupationally expensive.
Expand All @@ -483,7 +483,7 @@ private void manageInternalPostCritical(
taskRunner.shutdown(taskId, reason);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
log.warn(e, "TaskRunner failed to clean up task[%s].", taskId);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3222,7 +3222,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException

final boolean stopTasksEarly;
if (earlyStopTime != null && !earlyStopTime.isAfterNow()) {
log.info("Early stop requested, signalling tasks to complete.");
log.info("Early stop requested for supervisor[%s], signalling tasks to complete.", supervisorId);
earlyStopTime = null;
stopTasksEarly = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private Runnable computeAndCollectLag()
}
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
log.debug("Supervisor[%s] is suspended, skipping lag collection", dataSource);
}
}
catch (Exception e) {
Expand Down Expand Up @@ -237,7 +237,7 @@ private int computeDesiredTaskCount(List<Long> lags)

int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
log.debug("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
Expand All @@ -258,7 +258,7 @@ private int computeDesiredTaskCount(List<Long> lags)
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMin) {
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
log.debug("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource[%s].",
dataSource
);
emitter.emit(metricBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,29 @@
import java.util.List;

/**
* Contains dimension names used while emitting metrics.
*/
public class DruidMetrics
{
// Query dimensions
public static final String DATASOURCE = "dataSource";
public static final String TYPE = "type";
public static final String INTERVAL = "interval";
public static final String ID = "id";
public static final String SUBQUERY_ID = "subQueryId";
public static final String TASK_ID = "taskId";
public static final String GROUP_ID = "groupId";
public static final String STATUS = "status";
public static final String TASK_INGESTION_MODE = "taskIngestionMode";

public static final String PARTITIONING_TYPE = "partitioningType";

// task metrics
// Task dimensions
public static final String TASK_ID = "taskId";
public static final String GROUP_ID = "groupId";
public static final String TASK_TYPE = "taskType";
public static final String TASK_STATUS = "taskStatus";

// Ingestion dimensions
public static final String PARTITIONING_TYPE = "partitioningType";
public static final String TASK_INGESTION_MODE = "taskIngestionMode";
public static final String TASK_ACTION_TYPE = "taskActionType";
public static final String STREAM = "stream";

public static final String PARTITION = "partition";

public static final String TAGS = "tags";
Expand Down
5 changes: 0 additions & 5 deletions processing/src/main/java/org/apache/druid/query/Druids.java
Original file line number Diff line number Diff line change
Expand Up @@ -1065,9 +1065,4 @@ public static DataSourceMetadataQueryBuilder newDataSourceMetadataQueryBuilder()
{
return new DataSourceMetadataQueryBuilder();
}

public static FilteredDataSource filteredDataSource(DataSource base, DimFilter filter)
{
return FilteredDataSource.create(base, filter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,7 @@ private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
if (createdSegment != null) {
pendingSegments.add(createdSegment.getId());
uniqueRequestToSegment.put(uniqueRequest, createdSegment);
log.info("Created new segment[%s]", createdSegment.getId());
log.debug("Created new segment[%s]", createdSegment.getId());
}
}

Expand All @@ -1270,7 +1270,6 @@ private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
}
}

log.info("Created [%d] new segments for [%d] allocate requests.", uniqueRequestToSegment.size(), requests.size());
return createdSegments;
}

Expand Down