Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@
*/
public class MovingAverageQueryRunner implements QueryRunner<Row>
{

public static final String QUERY_FAIL_TIME = "queryFailTime";
public static final String QUERY_TOTAL_BYTES_GATHERED = "queryTotalBytesGathered";

private final QuerySegmentWalker walker;
private final RequestLogger requestLogger;

Expand Down Expand Up @@ -127,8 +123,11 @@ public Sequence<Row> run(QueryPlus<Row> query, ResponseContext responseContext)
GroupByQuery gbq = builder.build();

ResponseContext gbqResponseContext = ResponseContext.createEmpty();
gbqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(gbq));
gbqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
gbqResponseContext.put(
ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS,
System.currentTimeMillis() + QueryContexts.getTimeout(gbq)
);
gbqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());

Sequence<ResultRow> results = gbq.getRunner(walker).run(QueryPlus.wrap(gbq), gbqResponseContext);
try {
Expand Down Expand Up @@ -165,8 +164,11 @@ public Sequence<Row> run(QueryPlus<Row> query, ResponseContext responseContext)
maq.getContext()
);
ResponseContext tsqResponseContext = ResponseContext.createEmpty();
tsqResponseContext.put(QUERY_FAIL_TIME, System.currentTimeMillis() + QueryContexts.getTimeout(tsq));
tsqResponseContext.put(QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());
tsqResponseContext.put(
ResponseContext.Key.QUERY_FAIL_DEADLINE_MILLIS,
System.currentTimeMillis() + QueryContexts.getTimeout(tsq)
);
tsqResponseContext.put(ResponseContext.Key.QUERY_TOTAL_BYTES_GATHERED, new AtomicLong());

Sequence<Result<TimeseriesResultValue>> results = tsq.getRunner(walker).run(QueryPlus.wrap(tsq), tsqResponseContext);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void after(boolean isDone, Throwable thrown)
if (report) {
final long cpuTimeNs = cpuTimeAccumulator.get();
if (cpuTimeNs > 0) {
responseContext.add(ResponseContext.Key.CPU_CONSUMED_NANOS, cpuTimeNs);
queryWithMetrics.getQueryMetrics().reportCpuTime(cpuTimeNs).emit(emitter);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ public static ScanQueryBuilder copy(ScanQuery query)
.virtualColumns(query.getVirtualColumns())
.resultFormat(query.getResultFormat())
.batchSize(query.getBatchSize())
.limit(query.getLimit())
.limit(query.getScanRowsLimit())
.filters(query.getFilter())
.columns(query.getColumns())
.legacy(query.isLegacy())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.context.ResponseContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Collections;

/**
*/
Expand All @@ -40,13 +39,7 @@ public ReportTimelineMissingSegmentQueryRunner(SegmentDescriptor descriptor)
@Override
public Sequence<T> run(QueryPlus<T> queryPlus, ResponseContext responseContext)
{
List<SegmentDescriptor> missingSegments =
(List<SegmentDescriptor>) responseContext.get(ResponseContext.CTX_MISSING_SEGMENTS);
if (missingSegments == null) {
missingSegments = new ArrayList<>();
responseContext.put(ResponseContext.CTX_MISSING_SEGMENTS, missingSegments);
}
missingSegments.add(descriptor);
responseContext.add(ResponseContext.Key.MISSING_SEGMENTS, Collections.singletonList(descriptor));
return Sequences.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat
for (int i = 0; i < config.getNumTries(); i++) {
log.info("[%,d] missing segments found. Retry attempt [%,d]", missingSegments.size(), i);

context.put(ResponseContext.CTX_MISSING_SEGMENTS, new ArrayList<>());
context.put(ResponseContext.Key.MISSING_SEGMENTS, new ArrayList<>());
final QueryPlus<T> retryQueryPlus = queryPlus.withQuerySegmentSpec(
new MultipleSpecificSegmentSpec(
missingSegments
Expand Down Expand Up @@ -102,7 +102,7 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulat

private List<SegmentDescriptor> getMissingSegments(final ResponseContext context)
{
final Object maybeMissingSegments = context.get(ResponseContext.CTX_MISSING_SEGMENTS);
final Object maybeMissingSegments = context.get(ResponseContext.Key.MISSING_SEGMENTS);
if (maybeMissingSegments == null) {
return new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public static ConcurrentResponseContext createEmpty()
return new ConcurrentResponseContext();
}

private final ConcurrentHashMap<String, Object> delegate = new ConcurrentHashMap<>();
private final ConcurrentHashMap<BaseKey, Object> delegate = new ConcurrentHashMap<>();

@Override
protected Map<String, Object> getDelegate()
protected Map<BaseKey, Object> getDelegate()
{
return delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public static DefaultResponseContext createEmpty()
return new DefaultResponseContext();
}

private final HashMap<String, Object> delegate = new HashMap<>();
private final HashMap<BaseKey, Object> delegate = new HashMap<>();

@Override
protected Map<String, Object> getDelegate()
protected Map<BaseKey, Object> getDelegate()
{
return delegate;
}
Expand Down
Loading