Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.java.util.common.ISE;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.QueryInterruptedException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.groupby.orderby.DefaultLimitSpec;
import io.druid.segment.ColumnSelectorFactory;
Expand All @@ -33,7 +38,12 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
* Grouper based around a set of underlying {@link SpillingGrouper} instances. Thread-safe.
Expand All @@ -51,7 +61,6 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private final AtomicInteger threadNumber = new AtomicInteger();
private volatile boolean spilling = false;
private volatile boolean closed = false;
private final Comparator<Grouper.Entry<KeyType>> keyObjComparator;

private final Supplier<ByteBuffer> bufferSupplier;
private final ColumnSelectorFactory columnSelectorFactory;
Expand All @@ -65,6 +74,11 @@ public class ConcurrentGrouper<KeyType> implements Grouper<KeyType>
private final KeySerdeFactory<KeyType> keySerdeFactory;
private final DefaultLimitSpec limitSpec;
private final boolean sortHasNonGroupingFields;
private final Comparator<Grouper.Entry<KeyType>> keyObjComparator;
private final ListeningExecutorService grouperSorter;
private final int priority;
private final boolean hasQueryTimeout;
private final long queryTimeoutAt;

private volatile boolean initialized = false;

Expand All @@ -80,7 +94,11 @@ public ConcurrentGrouper(
final ObjectMapper spillMapper,
final int concurrencyHint,
final DefaultLimitSpec limitSpec,
final boolean sortHasNonGroupingFields
final boolean sortHasNonGroupingFields,
final ListeningExecutorService grouperSorter,
final int priority,
final boolean hasQueryTimeout,
final long queryTimeoutAt
)
{
Preconditions.checkArgument(concurrencyHint > 0, "concurrencyHint > 0");
Expand Down Expand Up @@ -108,6 +126,10 @@ protected SpillingGrouper<KeyType> initialValue()
this.limitSpec = limitSpec;
this.sortHasNonGroupingFields = sortHasNonGroupingFields;
this.keyObjComparator = keySerdeFactory.objectComparator(sortHasNonGroupingFields);
this.grouperSorter = Preconditions.checkNotNull(grouperSorter);
this.priority = priority;
this.hasQueryTimeout = hasQueryTimeout;
this.queryTimeoutAt = queryTimeoutAt;
}

@Override
Expand Down Expand Up @@ -216,15 +238,58 @@ public Iterator<Entry<KeyType>> iterator(final boolean sorted)
throw new ISE("Grouper is closed");
}

final List<Iterator<Entry<KeyType>>> iterators = new ArrayList<>(groupers.size());
return Groupers.mergeIterators(
sorted && isParallelSortAvailable() ? parallelSortAndGetGroupersIterator() : getGroupersIterator(sorted),
sorted ? keyObjComparator : null
);
}

for (Grouper<KeyType> grouper : groupers) {
synchronized (grouper) {
iterators.add(grouper.iterator(sorted));
}
private boolean isParallelSortAvailable()
{
return concurrencyHint > 1;
}

private List<Iterator<Entry<KeyType>>> parallelSortAndGetGroupersIterator()
{
// The number of groupers is same with the number of processing threads in grouperSorter
final ListenableFuture<List<Iterator<Entry<KeyType>>>> future = Futures.allAsList(
groupers.stream()
.map(grouper ->
grouperSorter.submit(
new AbstractPrioritizedCallable<Iterator<Entry<KeyType>>>(priority)
{
@Override
public Iterator<Entry<KeyType>> call() throws Exception
{
return grouper.iterator(true);
}
}
)
)
.collect(Collectors.toList())
);

try {
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
}
catch (InterruptedException | TimeoutException e) {
future.cancel(true);
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}

return Groupers.mergeIterators(iterators, sorted ? keyObjComparator : null);
private List<Iterator<Entry<KeyType>>> getGroupersIterator(boolean sorted)
{
return groupers.stream()
.map(grouper -> grouper.iterator(sorted))
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,11 @@ public CloseableGrouperIterator<RowBasedKey, Row> make()
concurrencyHint,
temporaryStorage,
spillMapper,
combiningAggregatorFactories
combiningAggregatorFactories,
exec,
priority,
hasTimeout,
timeoutAt
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ public ByteBuffer get()
return mergeBufferHolder.get();
}
},
-1,
temporaryStorage,
spillMapper,
aggregatorFactories
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ default ToIntFunction<KeyType> hashFunction()
* <p>
* Once this method is called, writes are no longer safe. After you are done with the iterator returned by this
* method, you should either call {@link #close()} (if you are done with the Grouper), {@link #reset()} (if you
* want to reuse it), or {@link #iterator(boolean)} again if you want another iterator.
* want to reuse it), or {@link #iterator(boolean)} again if you want another iterator. This method is not thread-safe
* and must not be called by multiple threads concurrently.
* <p>
* If "sorted" is true then the iterator will return sorted results. It will use KeyType's natural ordering on
* deserialized objects, and will use the {@link KeySerde#comparator()} on serialized objects. Woe be unto you
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.common.primitives.Floats;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.java.util.common.IAE;
Expand Down Expand Up @@ -65,6 +66,7 @@
import io.druid.segment.data.IndexedInts;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -78,6 +80,41 @@
// this class contains shared code between GroupByMergingQueryRunnerV2 and GroupByRowProcessor
public class RowBasedGrouperHelper
{
private static final int SINGLE_THREAD_CONCURRENCY_HINT = -1;
private static final int UNKNOWN_THREAD_PRIORITY = -1;
private static final long UNKNOWN_TIMEOUT = -1L;

/**
* Create a single-threaded grouper and accumulator.
*/
public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> createGrouperAccumulatorPair(
final GroupByQuery query,
final boolean isInputRaw,
final Map<String, ValueType> rawInputRowSignature,
final GroupByQueryConfig config,
final Supplier<ByteBuffer> bufferSupplier,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
final AggregatorFactory[] aggregatorFactories
)
{
return createGrouperAccumulatorPair(
query,
isInputRaw,
rawInputRowSignature,
config,
bufferSupplier,
SINGLE_THREAD_CONCURRENCY_HINT,
temporaryStorage,
spillMapper,
aggregatorFactories,
null,
UNKNOWN_THREAD_PRIORITY,
false,
UNKNOWN_TIMEOUT
);
}

/**
* If isInputRaw is true, transformations such as timestamp truncation and extraction functions have not
* been applied to the input rows yet, for example, in a nested query, if an extraction function is being
Expand All @@ -92,7 +129,11 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> crea
final int concurrencyHint,
final LimitedTemporaryStorage temporaryStorage,
final ObjectMapper spillMapper,
final AggregatorFactory[] aggregatorFactories
final AggregatorFactory[] aggregatorFactories,
@Nullable final ListeningExecutorService grouperSorter,
final int priority,
final boolean hasQueryTimeout,
final long queryTimeoutAt
)
{
// concurrencyHint >= 1 for concurrent groupers, -1 for single-threaded
Expand Down Expand Up @@ -160,7 +201,11 @@ public static Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> crea
spillMapper,
concurrencyHint,
limitSpec,
sortHasNonGroupingFields
sortHasNonGroupingFields,
grouperSorter,
priority,
hasQueryTimeout,
queryTimeoutAt
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.base.Supplier;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.MoreExecutors;
import io.druid.concurrent.Execs;
import io.druid.java.util.common.IAE;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
Expand Down Expand Up @@ -187,7 +189,7 @@ public DoubleColumnSelector makeDoubleColumnSelector(String columnName)
}
};

@Test
@Test(timeout = 5000L)
public void testAggregate() throws InterruptedException, ExecutionException
{
final ConcurrentGrouper<Long> grouper = new ConcurrentGrouper<>(
Expand All @@ -202,7 +204,11 @@ public void testAggregate() throws InterruptedException, ExecutionException
null,
8,
null,
false
false,
MoreExecutors.listeningDecorator(Execs.multiThreaded(4, "concurrent-grouper-test-%d")),
0,
false,
0
);

Future<?>[] futures = new Future[8];
Expand Down