Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 0 additions & 108 deletions sql/src/main/java/org/apache/druid/sql/calcite/rel/QueryMaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ public Sequence<Object[]> runQuery(final DruidQuery druidQuery)
return executeGroupBy(druidQuery, (GroupByQuery) query);
} else if (query instanceof ScanQuery) {
return executeScan(druidQuery, (ScanQuery) query);
} else if (query instanceof SelectQuery) {
return executeSelect(druidQuery, (SelectQuery) query);
} else {
throw new ISE("Cannot run query of class[%s]", query.getClass().getName());
}
Expand Down Expand Up @@ -182,112 +180,6 @@ private Sequence<Object[]> executeScan(
);
}

private Sequence<Object[]> executeSelect(
final DruidQuery druidQuery,
final SelectQuery baseQuery
)
{
Preconditions.checkState(druidQuery.getGrouping() == null, "grouping must be null");

final List<RelDataTypeField> fieldList = druidQuery.getOutputRowType().getFieldList();
final Integer limit = druidQuery.getLimitSpec() != null ? druidQuery.getLimitSpec().getLimit() : null;
final RowSignature outputRowSignature = druidQuery.getOutputRowSignature();

// Select is paginated, we need to make multiple queries.
final Sequence<Sequence<Object[]>> sequenceOfSequences = Sequences.simple(
new Iterable<Sequence<Object[]>>()
{
@Override
public Iterator<Sequence<Object[]>> iterator()
{
final AtomicBoolean morePages = new AtomicBoolean(true);
final AtomicReference<Map<String, Integer>> pagingIdentifiers = new AtomicReference<>();
final AtomicLong rowsRead = new AtomicLong();

// Each Sequence<Object[]> is one page.
return new Iterator<Sequence<Object[]>>()
{
@Override
public boolean hasNext()
{
return morePages.get();
}

@Override
public Sequence<Object[]> next()
{
final SelectQuery queryWithPagination = baseQuery.withPagingSpec(
new PagingSpec(
pagingIdentifiers.get(),
plannerContext.getPlannerConfig().getSelectThreshold(),
true
)
);

morePages.set(false);
final AtomicBoolean gotResult = new AtomicBoolean();

return Sequences.concat(
Sequences.map(
runQuery(queryWithPagination),
new Function<Result<SelectResultValue>, Sequence<Object[]>>()
{
@Override
public Sequence<Object[]> apply(final Result<SelectResultValue> result)
{
if (!gotResult.compareAndSet(false, true)) {
throw new ISE("WTF?! Expected single result from Select query but got multiple!");
}

pagingIdentifiers.set(result.getValue().getPagingIdentifiers());
final List<Object[]> retVals = new ArrayList<>();
for (EventHolder holder : result.getValue().getEvents()) {
morePages.set(true);
final Map<String, Object> map = holder.getEvent();
final Object[] retVal = new Object[fieldList.size()];
for (RelDataTypeField field : fieldList) {
final String outputName = outputRowSignature.getRowOrder().get(field.getIndex());
if (outputName.equals(ColumnHolder.TIME_COLUMN_NAME)) {
retVal[field.getIndex()] = coerce(
holder.getTimestamp().getMillis(),
field.getType().getSqlTypeName()
);
} else {
retVal[field.getIndex()] = coerce(
map.get(outputName),
field.getType().getSqlTypeName()
);
}
}
if (limit == null || rowsRead.incrementAndGet() <= limit) {
retVals.add(retVal);
} else {
morePages.set(false);
return Sequences.simple(retVals);
}
}

return Sequences.simple(retVals);
}
}
)
);
}

@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
}
);

return Sequences.concat(sequenceOfSequences);
}

@SuppressWarnings("unchecked")
private <T> Sequence<T> runQuery(Query<T> query)
{
Hook.QUERY_PLAN.run(query);
Expand Down