diff --git a/processing/src/main/java/org/apache/druid/collections/StupidPool.java b/processing/src/main/java/org/apache/druid/collections/StupidPool.java index ced36e3a9ddc..06536c5d33fc 100644 --- a/processing/src/main/java/org/apache/druid/collections/StupidPool.java +++ b/processing/src/main/java/org/apache/druid/collections/StupidPool.java @@ -24,12 +24,13 @@ import com.google.common.base.Supplier; import org.apache.druid.java.util.common.Cleaners; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import java.lang.ref.WeakReference; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -105,7 +106,8 @@ public static boolean isPoisoned() private final AtomicLong createdObjectsCounter = new AtomicLong(0); private final AtomicLong leakedObjectsCounter = new AtomicLong(0); - private final AtomicReference capturedException = new AtomicReference<>(null); + private final AtomicReference> capturedException = + new AtomicReference<>(null); //note that this is just the max entries in the cache, pool can still create as many buffers as needed. private final int objectsCacheMaxCount; @@ -149,30 +151,41 @@ public ResourceHolder take() ObjectResourceHolder resourceHolder = objects.poll(); if (resourceHolder == null) { if (POISONED.get() && capturedException.get() != null) { - throw capturedException.get(); + throw makeExceptionForLeaks(capturedException.get()); } return makeObjectWithHandler(); } else { poolSize.decrementAndGet(); if (POISONED.get()) { - final RuntimeException exception = capturedException.get(); - if (exception == null) { - resourceHolder.notifier.except = new RE("Thread[%s]: leaky leak!", Thread.currentThread().getName()); + final CopyOnWriteArrayList exceptionList = capturedException.get(); + if (exceptionList == null) { + resourceHolder.notifier.except = new LeakedException(Thread.currentThread().getName()); } else { - throw exception; + throw makeExceptionForLeaks(exceptionList); } } return resourceHolder; } } + private RuntimeException makeExceptionForLeaks(CopyOnWriteArrayList exceptionList) + { + RuntimeException toThrow = new RuntimeException( + "Leaks happened, each suppressed exception represents one code path that checked out an object and didn't return it." + ); + for (LeakedException exception : exceptionList) { + toThrow.addSuppressed(exception); + } + return toThrow; + } + private ObjectResourceHolder makeObjectWithHandler() { T object = generator.get(); createdObjectsCounter.incrementAndGet(); ObjectId objectId = new ObjectId(); ObjectLeakNotifier notifier = new ObjectLeakNotifier(this, POISONED.get()); - // Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken + // Using objectId as referent for Cleaner, because if the object itself (e.g. ByteBuffer) is leaked after taken // from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak. return new ObjectResourceHolder(object, objectId, Cleaners.register(objectId, notifier), notifier); } @@ -198,6 +211,7 @@ public long objectsCreatedCount() private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier) { long currentPoolSize; + notifier.except = null; do { currentPoolSize = poolSize.get(); if (currentPoolSize >= objectsCacheMaxCount) { @@ -310,14 +324,14 @@ private static class ObjectLeakNotifier implements Runnable final AtomicLong leakedObjectsCounter; final AtomicBoolean disabled = new AtomicBoolean(false); - private RuntimeException except; + private LeakedException except; ObjectLeakNotifier(StupidPool pool, boolean poisoned) { poolReference = new WeakReference<>(pool); leakedObjectsCounter = pool.leakedObjectsCounter; - except = poisoned ? new RE("Thread[%s]: drip drip", Thread.currentThread().getName()) : null; + except = poisoned ? new LeakedException(Thread.currentThread().getName()) : null; } @Override @@ -329,7 +343,12 @@ public void run() final StupidPool pool = poolReference.get(); log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", pool); if (except != null && pool != null) { - pool.capturedException.set(except); + CopyOnWriteArrayList exceptions = pool.capturedException.get(); + if (exceptions == null) { + pool.capturedException.compareAndSet(null, new CopyOnWriteArrayList<>()); + } + exceptions = pool.capturedException.get(); + exceptions.add(except); log.error(except, "notifier[%s], dumping stack trace from object checkout and poisoning pool", this); } } @@ -357,4 +376,25 @@ public void disable() private static class ObjectId { } + + /** + * This exception exists primarily to defer string interpolation to when getMessage is called instead of + * interpolating on constructor. While not a primary bottleneck, the string interpolation for poisoned stupid + * pools does show up in profiling so avoiding it is just good hygiene. + */ + private static class LeakedException extends RuntimeException + { + private final String threadName; + + public LeakedException(String threadName) + { + this.threadName = threadName; + } + + @Override + public String getMessage() + { + return StringUtils.format("Originally checked out by thread [%s]", threadName); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java b/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java index b658335e6a9b..b5768fe9cd22 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java @@ -19,16 +19,13 @@ package org.apache.druid.query.operator; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.QueryPlus; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; -import java.util.List; public class LimitTimeIntervalOperator implements Operator { @@ -37,16 +34,11 @@ public class LimitTimeIntervalOperator implements Operator public LimitTimeIntervalOperator( Operator segmentOperator, - QueryPlus queryPlus + Interval interval ) { this.segmentOperator = segmentOperator; - - final List intervals = queryPlus.getQuery().getIntervals(); - if (intervals.size() != 1) { - throw new ISE("Can only handle a single interval, got[%s]", intervals); - } - interval = intervals.get(0); + this.interval = interval; } @Nullable @@ -61,11 +53,13 @@ public Closeable goOrContinue( @Override public Signal push(RowsAndColumns rac) { - final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac); - if (!Intervals.isEternity(interval)) { + if (Intervals.isEternity(interval)) { + return receiver.push(rac); + } else { + final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac); decor.limitTimeRange(interval); + return receiver.push(decor.toRowsAndColumns()); } - return receiver.push(decor.toRowsAndColumns()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d716524bb06c..b58aff80b4fb 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -20,16 +20,25 @@ package org.apache.druid.query.operator; import com.google.common.base.Function; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.column.RowSignature; +import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.List; public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory { @@ -41,10 +50,21 @@ public QueryRunner createRunner(Segment segment) return (queryPlus, responseContext) -> new OperatorSequence(() -> { Operator op = new SegmentToRowsAndColumnsOperator(segment); - op = new LimitTimeIntervalOperator(op, queryPlus); + + final List intervals = queryPlus.getQuery().getIntervals(); + if (intervals.size() != 1) { + throw DruidException.defensive("Can only handle a single interval, got [%s]", intervals); + } + + final Interval interval = intervals.get(0); + if (!Intervals.isEternity(interval)) { + op = new LimitTimeIntervalOperator(op, interval); + } + for (OperatorFactory leaf : ((WindowOperatorQuery) queryPlus.getQuery()).getLeafOperators()) { op = leaf.wrap(op); } + return op; }); } @@ -58,20 +78,47 @@ public QueryRunner mergeRunners( // This merge is extremely naive, there is no ordering being imposed over the data, nor is there any attempt // to shrink the size of the data before pushing it across the wire. This code implementation is intended more // to make this work for tests and less to work in production. That's why the WindowOperatorQuery forces - // a super-secrete context parameter to be set to actually allow it to run a query that pushes all the way down + // a super-secret context parameter to be set to actually allow it to run a query that pushes all the way down // like this. When this gets fixed, we can remove that parameter. return (queryPlus, responseContext) -> Sequences.concat( Sequences.map( Sequences.simple(queryRunners), new Function, Sequence>() { + @SuppressWarnings("ConstantConditions") @Nullable @Override public Sequence apply( @Nullable QueryRunner input ) { - return input.run(queryPlus, responseContext); + return Sequences.map( + input.run(queryPlus, responseContext), + new Function() + { + @Nullable + @Override + public RowsAndColumns apply(@Nullable RowsAndColumns input) + { + // This is interim code to force a materialization by synthesizing the wire transfer + // that will need to naturally happen as we flesh out this code more. For now, we + // materialize the bytes on-heap and then read them back in as a frame. + if (input instanceof LazilyDecoratedRowsAndColumns) { + final WireTransferable wire = WireTransferable.fromRAC(input); + final byte[] frameBytes = wire.bytesToTransfer(); + + RowSignature.Builder sigBob = RowSignature.builder(); + for (String column : input.getColumnNames()) { + sigBob.add(column, input.findColumn(column).toAccessor().getType()); + } + + return new FrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + } + return input; + } + } + ); + } } ) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index e6ef061d7150..4a62106a557c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -29,6 +29,7 @@ import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; @@ -41,6 +42,7 @@ import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; @@ -56,11 +58,16 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; public class LazilyDecoratedRowsAndColumns implements RowsAndColumns { + private static final Map, Function> AS_MAP = + RowsAndColumns.makeAsMap(LazilyDecoratedRowsAndColumns.class); + private RowsAndColumns base; private Interval interval; private Filter filter; @@ -118,33 +125,50 @@ public Column findColumn(String name) @Override public T as(Class clazz) { - if (RowsAndColumnsDecorator.class.equals(clazz)) { - // If we don't have a projection defined, then it's safe to continue collecting more decorations as we - // can meaningfully merge them together. - if (viewableColumns == null || viewableColumns.isEmpty()) { - return (T) new DefaultRowsAndColumnsDecorator( - base, - interval, - filter, - virtualColumns, - limit, - ordering - ); + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } + + @SuppressWarnings("unused") + @SemanticCreator + public RowsAndColumnsDecorator toRowsAndColumnsDecorator() + { + // If we don't have a projection defined, then it's safe to continue collecting more decorations as we + // can meaningfully merge them together. + if (viewableColumns == null || viewableColumns.isEmpty()) { + return new DefaultRowsAndColumnsDecorator(base, interval, filter, virtualColumns, limit, ordering); + } else { + return new DefaultRowsAndColumnsDecorator(this); + } + } + + @SuppressWarnings("unused") + @SemanticCreator + public WireTransferable toWireTransferable() + { + return () -> { + final Pair materialized = materialize(); + if (materialized == null) { + return new byte[]{}; } else { - return (T) new DefaultRowsAndColumnsDecorator(this); + return materialized.lhs; } - } - return null; + }; } private void maybeMaterialize() { if (!(interval == null && filter == null && limit == -1 && ordering == null)) { - materialize(); + final Pair thePair = materialize(); + if (thePair == null) { + reset(new EmptyRowsAndColumns()); + } else { + reset(new FrameRowsAndColumns(Frame.wrap(thePair.lhs), thePair.rhs)); + } } } - private void materialize() + private Pair materialize() { if (ordering != null) { throw new ISE("Cannot reorder[%s] scan data right now", ordering); @@ -152,13 +176,26 @@ private void materialize() final StorageAdapter as = base.as(StorageAdapter.class); if (as == null) { - reset(naiveMaterialize(base)); + return naiveMaterialize(base); } else { - reset(materializeStorageAdapter(as)); + return materializeStorageAdapter(as); } + } - private RowsAndColumns materializeStorageAdapter(StorageAdapter as) + private void reset(RowsAndColumns rac) + { + base = rac; + interval = null; + filter = null; + virtualColumns = null; + limit = -1; + viewableColumns = null; + ordering = null; + } + + @Nullable + private Pair materializeStorageAdapter(StorageAdapter as) { final Sequence cursors = as.makeCursors( filter, @@ -231,25 +268,15 @@ private RowsAndColumns materializeStorageAdapter(StorageAdapter as) // This means that the accumulate was never called, which can only happen if we didn't have any cursors. // We would only have zero cursors if we essentially didn't match anything, meaning that our RowsAndColumns // should be completely empty. - return new EmptyRowsAndColumns(); + return null; } else { final byte[] bytes = writer.toByteArray(); - return new FrameRowsAndColumns(Frame.wrap(bytes), siggy.get()); + return Pair.of(bytes, siggy.get()); } } - private void reset(RowsAndColumns rac) - { - base = rac; - interval = null; - filter = null; - virtualColumns = null; - limit = -1; - viewableColumns = null; - ordering = null; - } - - private RowsAndColumns naiveMaterialize(RowsAndColumns rac) + @Nullable + private Pair naiveMaterialize(RowsAndColumns rac) { final int numRows = rac.numRows(); @@ -264,7 +291,7 @@ private RowsAndColumns naiveMaterialize(RowsAndColumns rac) // long as is required by the time filter produces all 0s, so either 0 is included and matches all rows or // it's not and we skip all rows. if (!interval.contains(0)) { - return new EmptyRowsAndColumns(); + return null; } } else { final ColumnAccessor accessor = racColumn.toAccessor(); @@ -355,7 +382,6 @@ private RowsAndColumns naiveMaterialize(RowsAndColumns rac) frameWriter.addSelection(); } - return new FrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), sigBob.build()); + return Pair.of(frameWriter.toByteArray(), sigBob.build()); } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 7b6a1f6215d3..d139265d147d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,13 +19,19 @@ package org.apache.druid.query.rowsandcols; +import org.apache.druid.error.DruidException; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; /** * An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows @@ -69,6 +75,31 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) return retVal; } + static Map, Function> makeAsMap(Class clazz) + { + Map, Function> retVal = new HashMap<>(); + + for (Method method : clazz.getMethods()) { + if (method.isAnnotationPresent(SemanticCreator.class)) { + if (method.getParameterCount() != 0) { + throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method); + } + + retVal.put(method.getReturnType(), arg -> { + try { + return method.invoke(arg); + } + catch (InvocationTargetException | IllegalAccessException e) { + throw DruidException.defensive().build(e, "Problem invoking method [%s]", method); + } + }); + } + } + + return retVal; + } + + /** * The set of column names available from the RowsAndColumns * diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java new file mode 100644 index 000000000000..bb1af0e4d9f1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation used to indicate that the method is used as a creator for a semantic interface. + * + * Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of + * the {@link RowsAndColumns#as(Class)} method. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface SemanticCreator +{ +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java index e2dc8fa0a9d0..7c6038b38990 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; @@ -28,10 +29,12 @@ import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnHolder; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -41,6 +44,7 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseab private final QueryableIndex index; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Closer closer = Closer.create(); private final AtomicInteger numRows = new AtomicInteger(-1); @@ -72,6 +76,9 @@ public int numRows() @Override public Column findColumn(String name) { + if (closed.get()) { + throw DruidException.defensive("Cannot be accessed after being closed!?"); + } final ColumnHolder columnHolder = index.getColumnHolder(name); if (columnHolder == null) { return null; @@ -83,15 +90,18 @@ public Column findColumn(String name) @SuppressWarnings("unchecked") @Nullable @Override - public T as(Class clazz) + public T as(@Nonnull Class clazz) { + //noinspection ReturnOfNull return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); } @Override public void close() throws IOException { - closer.close(); + if (closed.compareAndSet(false, true)) { + closer.close(); + } } private static HashMap, Function> makeAsMap() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java index 45915dd9b5c6..6ce7c258ef17 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java @@ -28,6 +28,7 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import javax.annotation.Nullable; import java.util.ArrayList; public class DefaultNaiveSortMaker implements NaiveSortMaker @@ -61,6 +62,7 @@ public DefaultNaiveSorter(ArrayList ordering) } + @Nullable @Override public RowsAndColumns moreData(RowsAndColumns rac) { @@ -75,7 +77,12 @@ public RowsAndColumns complete() return new EmptyRowsAndColumns(); } - ConcatRowsAndColumns rac = new ConcatRowsAndColumns(racBuffer); + final RowsAndColumns rac; + if (racBuffer.size() == 1) { + rac = racBuffer.get(0); + } else { + rac = new ConcatRowsAndColumns(racBuffer); + } // One int for the racBuffer, another for the rowIndex int[] sortedPointers = new int[rac.numRows()]; diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index a04583a3b32d..6425e12def4d 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -62,6 +62,7 @@ /** * Helps tests make segments. */ +@SuppressWarnings({"NotNullFieldNotInitialized", "FieldMayBeFinal", "ConstantConditions", "NullableProblems"}) public class IndexBuilder { private static final int ROWS_PER_INDEX_FOR_MERGING = 1; @@ -261,7 +262,7 @@ public File buildMMappedIndexFile() tmpDir, schema.getDimensionsSpec(), indexSpec, - Integer.MAX_VALUE + -1 ); } catch (IOException e) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index 84f920878f15..762ae621d1fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -109,7 +109,6 @@ public DrillWindowQueryTest( this.filename = filename; } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate, @@ -152,7 +151,13 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( @Test public void windowQueryTest() { + Thread thread = null; + String oldName = null; try { + thread = Thread.currentThread(); + oldName = thread.getName(); + thread.setName("drillWindowQuery-" + filename); + final String query = getQueryString(); final String results = getExpectedResults(); @@ -160,14 +165,17 @@ public void windowQueryTest() .skipVectorize(true) .sql(query) .queryContext(ImmutableMap.of("windowsAreForClosers", true, "windowsAllTheWayDown", true)) - .expectedResults((sql, results1) -> { - Assert.assertEquals(results, results1); - }) + .expectedResults((sql, results1) -> Assert.assertEquals(results, String.valueOf(results1))) .run(); } catch (Throwable e) { log.info(e, "Got a throwable, here it is. Ignoring for now."); } + finally { + if (thread != null && oldName != null) { + thread.setName(oldName); + } + } } @Nonnull @@ -182,6 +190,7 @@ private String getExpectedResults() throws IOException return readStringFromResource(".e"); } + @SuppressWarnings({"UnstableApiUsage", "ConstantConditions"}) @Nonnull private String readStringFromResource(String s) throws IOException { @@ -192,6 +201,7 @@ private String readStringFromResource(String s) throws IOException return query; } + @SuppressWarnings({"rawtypes", "unchecked"}) private void attachIndex(SpecificSegmentsQuerySegmentWalker texasRanger, String dataSource, DimensionSchema... dims) throws IOException {