Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import com.google.common.base.Supplier;
import org.apache.druid.java.util.common.Cleaners;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;

import java.lang.ref.WeakReference;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -105,7 +106,8 @@ public static boolean isPoisoned()
private final AtomicLong createdObjectsCounter = new AtomicLong(0);
private final AtomicLong leakedObjectsCounter = new AtomicLong(0);

private final AtomicReference<RuntimeException> capturedException = new AtomicReference<>(null);
private final AtomicReference<CopyOnWriteArrayList<LeakedException>> capturedException =
new AtomicReference<>(null);

//note that this is just the max entries in the cache, pool can still create as many buffers as needed.
private final int objectsCacheMaxCount;
Expand Down Expand Up @@ -149,30 +151,41 @@ public ResourceHolder<T> take()
ObjectResourceHolder resourceHolder = objects.poll();
if (resourceHolder == null) {
if (POISONED.get() && capturedException.get() != null) {
throw capturedException.get();
throw makeExceptionForLeaks(capturedException.get());
}
return makeObjectWithHandler();
} else {
poolSize.decrementAndGet();
if (POISONED.get()) {
final RuntimeException exception = capturedException.get();
if (exception == null) {
resourceHolder.notifier.except = new RE("Thread[%s]: leaky leak!", Thread.currentThread().getName());
final CopyOnWriteArrayList<LeakedException> exceptionList = capturedException.get();
if (exceptionList == null) {
resourceHolder.notifier.except = new LeakedException(Thread.currentThread().getName());
} else {
throw exception;
throw makeExceptionForLeaks(exceptionList);
}
}
return resourceHolder;
}
}

private RuntimeException makeExceptionForLeaks(CopyOnWriteArrayList<LeakedException> exceptionList)
{
RuntimeException toThrow = new RuntimeException(
"Leaks happened, each suppressed exception represents one code path that checked out an object and didn't return it."
);
for (LeakedException exception : exceptionList) {
toThrow.addSuppressed(exception);
}
return toThrow;
}

private ObjectResourceHolder makeObjectWithHandler()
{
T object = generator.get();
createdObjectsCounter.incrementAndGet();
ObjectId objectId = new ObjectId();
ObjectLeakNotifier notifier = new ObjectLeakNotifier(this, POISONED.get());
// Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken
// Using objectId as referent for Cleaner, because if the object itself (e.g. ByteBuffer) is leaked after taken
// from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak.
return new ObjectResourceHolder(object, objectId, Cleaners.register(objectId, notifier), notifier);
}
Expand All @@ -198,6 +211,7 @@ public long objectsCreatedCount()
private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier)
{
long currentPoolSize;
notifier.except = null;
do {
currentPoolSize = poolSize.get();
if (currentPoolSize >= objectsCacheMaxCount) {
Expand Down Expand Up @@ -310,14 +324,14 @@ private static class ObjectLeakNotifier implements Runnable
final AtomicLong leakedObjectsCounter;
final AtomicBoolean disabled = new AtomicBoolean(false);

private RuntimeException except;
private LeakedException except;

ObjectLeakNotifier(StupidPool<?> pool, boolean poisoned)
{
poolReference = new WeakReference<>(pool);
leakedObjectsCounter = pool.leakedObjectsCounter;

except = poisoned ? new RE("Thread[%s]: drip drip", Thread.currentThread().getName()) : null;
except = poisoned ? new LeakedException(Thread.currentThread().getName()) : null;
}

@Override
Expand All @@ -329,7 +343,12 @@ public void run()
final StupidPool<?> pool = poolReference.get();
log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", pool);
if (except != null && pool != null) {
pool.capturedException.set(except);
CopyOnWriteArrayList<LeakedException> exceptions = pool.capturedException.get();
if (exceptions == null) {
pool.capturedException.compareAndSet(null, new CopyOnWriteArrayList<>());
}
exceptions = pool.capturedException.get();
exceptions.add(except);
log.error(except, "notifier[%s], dumping stack trace from object checkout and poisoning pool", this);
}
}
Expand Down Expand Up @@ -357,4 +376,25 @@ public void disable()
private static class ObjectId
{
}

/**
* This exception exists primarily to defer string interpolation to when getMessage is called instead of
* interpolating on constructor. While not a primary bottleneck, the string interpolation for poisoned stupid
* pools does show up in profiling so avoiding it is just good hygiene.
*/
private static class LeakedException extends RuntimeException
{
private final String threadName;

public LeakedException(String threadName)
{
this.threadName = threadName;
}

@Override
public String getMessage()
{
return StringUtils.format("Originally checked out by thread [%s]", threadName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@

package org.apache.druid.query.operator;

import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.List;

public class LimitTimeIntervalOperator implements Operator
{
Expand All @@ -37,16 +34,11 @@ public class LimitTimeIntervalOperator implements Operator

public LimitTimeIntervalOperator(
Operator segmentOperator,
QueryPlus<RowsAndColumns> queryPlus
Interval interval
)
{
this.segmentOperator = segmentOperator;

final List<Interval> intervals = queryPlus.getQuery().getIntervals();
if (intervals.size() != 1) {
throw new ISE("Can only handle a single interval, got[%s]", intervals);
}
interval = intervals.get(0);
this.interval = interval;
}

@Nullable
Expand All @@ -61,11 +53,13 @@ public Closeable goOrContinue(
@Override
public Signal push(RowsAndColumns rac)
{
final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac);
if (!Intervals.isEternity(interval)) {
if (Intervals.isEternity(interval)) {
return receiver.push(rac);
} else {
final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac);
decor.limitTimeRange(interval);
return receiver.push(decor.toRowsAndColumns());
}
return receiver.push(decor.toRowsAndColumns());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@
package org.apache.druid.query.operator;

import com.google.common.base.Function;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
import org.apache.druid.query.rowsandcols.RowsAndColumns;
import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns;
import org.apache.druid.query.rowsandcols.semantic.WireTransferable;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.RowSignature;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.List;

public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory<RowsAndColumns, WindowOperatorQuery>
{
Expand All @@ -41,10 +50,21 @@ public QueryRunner<RowsAndColumns> createRunner(Segment segment)
return (queryPlus, responseContext) ->
new OperatorSequence(() -> {
Operator op = new SegmentToRowsAndColumnsOperator(segment);
op = new LimitTimeIntervalOperator(op, queryPlus);

final List<Interval> intervals = queryPlus.getQuery().getIntervals();
if (intervals.size() != 1) {
throw DruidException.defensive("Can only handle a single interval, got [%s]", intervals);
}

final Interval interval = intervals.get(0);
if (!Intervals.isEternity(interval)) {
op = new LimitTimeIntervalOperator(op, interval);
}

for (OperatorFactory leaf : ((WindowOperatorQuery) queryPlus.getQuery()).getLeafOperators()) {
op = leaf.wrap(op);
}

return op;
});
}
Expand All @@ -58,20 +78,47 @@ public QueryRunner<RowsAndColumns> mergeRunners(
// This merge is extremely naive, there is no ordering being imposed over the data, nor is there any attempt
// to shrink the size of the data before pushing it across the wire. This code implementation is intended more
// to make this work for tests and less to work in production. That's why the WindowOperatorQuery forces
// a super-secrete context parameter to be set to actually allow it to run a query that pushes all the way down
// a super-secret context parameter to be set to actually allow it to run a query that pushes all the way down
// like this. When this gets fixed, we can remove that parameter.
return (queryPlus, responseContext) -> Sequences.concat(
Sequences.map(
Sequences.simple(queryRunners),
new Function<QueryRunner<RowsAndColumns>, Sequence<RowsAndColumns>>()
{
@SuppressWarnings("ConstantConditions")
@Nullable
@Override
public Sequence<RowsAndColumns> apply(
@Nullable QueryRunner<RowsAndColumns> input
)
{
return input.run(queryPlus, responseContext);
return Sequences.map(
input.run(queryPlus, responseContext),
new Function<RowsAndColumns, RowsAndColumns>()
{
@Nullable
@Override
public RowsAndColumns apply(@Nullable RowsAndColumns input)
{
// This is interim code to force a materialization by synthesizing the wire transfer
// that will need to naturally happen as we flesh out this code more. For now, we
// materialize the bytes on-heap and then read them back in as a frame.
if (input instanceof LazilyDecoratedRowsAndColumns) {
final WireTransferable wire = WireTransferable.fromRAC(input);
final byte[] frameBytes = wire.bytesToTransfer();

RowSignature.Builder sigBob = RowSignature.builder();
for (String column : input.getColumnNames()) {
sigBob.add(column, input.findColumn(column).toAccessor().getType());
}

return new FrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build());
}
return input;
}
}
);

}
}
)
Expand Down
Loading