From 1a86d4441f41a182f10a541129dd989814600a53 Mon Sep 17 00:00:00 2001 From: imply-cheddar Date: Wed, 12 Jul 2023 13:15:32 +0900 Subject: [PATCH 1/2] Fix a resource leak with Window processing Additionally, in order to find the leak, there were adjustments to the StupidPool to track leaks a bit better. It would appear that the pool objects get GC'd during testing for some reason which was causing some incorrect identification of leaks from objects that had been returned but were GC'd along with the pool. --- .../apache/druid/collections/StupidPool.java | 62 +++++++++--- .../operator/LimitTimeIntervalOperator.java | 20 ++-- ...WindowOperatorQueryQueryRunnerFactory.java | 53 +++++++++- .../LazilyDecoratedRowsAndColumns.java | 98 ++++++++++++------- .../query/rowsandcols/RowsAndColumns.java | 31 ++++++ .../query/rowsandcols/SemanticCreator.java | 37 +++++++ .../QueryableIndexRowsAndColumns.java | 14 ++- .../semantic/DefaultNaiveSortMaker.java | 9 +- .../apache/druid/segment/IndexBuilder.java | 3 +- .../sql/calcite/DrillWindowQueryTest.java | 18 +++- 10 files changed, 273 insertions(+), 72 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java diff --git a/processing/src/main/java/org/apache/druid/collections/StupidPool.java b/processing/src/main/java/org/apache/druid/collections/StupidPool.java index ced36e3a9ddc..06536c5d33fc 100644 --- a/processing/src/main/java/org/apache/druid/collections/StupidPool.java +++ b/processing/src/main/java/org/apache/druid/collections/StupidPool.java @@ -24,12 +24,13 @@ import com.google.common.base.Supplier; import org.apache.druid.java.util.common.Cleaners; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import java.lang.ref.WeakReference; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -105,7 +106,8 @@ public static boolean isPoisoned() private final AtomicLong createdObjectsCounter = new AtomicLong(0); private final AtomicLong leakedObjectsCounter = new AtomicLong(0); - private final AtomicReference capturedException = new AtomicReference<>(null); + private final AtomicReference> capturedException = + new AtomicReference<>(null); //note that this is just the max entries in the cache, pool can still create as many buffers as needed. private final int objectsCacheMaxCount; @@ -149,30 +151,41 @@ public ResourceHolder take() ObjectResourceHolder resourceHolder = objects.poll(); if (resourceHolder == null) { if (POISONED.get() && capturedException.get() != null) { - throw capturedException.get(); + throw makeExceptionForLeaks(capturedException.get()); } return makeObjectWithHandler(); } else { poolSize.decrementAndGet(); if (POISONED.get()) { - final RuntimeException exception = capturedException.get(); - if (exception == null) { - resourceHolder.notifier.except = new RE("Thread[%s]: leaky leak!", Thread.currentThread().getName()); + final CopyOnWriteArrayList exceptionList = capturedException.get(); + if (exceptionList == null) { + resourceHolder.notifier.except = new LeakedException(Thread.currentThread().getName()); } else { - throw exception; + throw makeExceptionForLeaks(exceptionList); } } return resourceHolder; } } + private RuntimeException makeExceptionForLeaks(CopyOnWriteArrayList exceptionList) + { + RuntimeException toThrow = new RuntimeException( + "Leaks happened, each suppressed exception represents one code path that checked out an object and didn't return it." + ); + for (LeakedException exception : exceptionList) { + toThrow.addSuppressed(exception); + } + return toThrow; + } + private ObjectResourceHolder makeObjectWithHandler() { T object = generator.get(); createdObjectsCounter.incrementAndGet(); ObjectId objectId = new ObjectId(); ObjectLeakNotifier notifier = new ObjectLeakNotifier(this, POISONED.get()); - // Using objectId as referent for Cleaner, because if the object itself (e. g. ByteBuffer) is leaked after taken + // Using objectId as referent for Cleaner, because if the object itself (e.g. ByteBuffer) is leaked after taken // from the pool, and the ResourceHolder is not closed, Cleaner won't notify about the leak. return new ObjectResourceHolder(object, objectId, Cleaners.register(objectId, notifier), notifier); } @@ -198,6 +211,7 @@ public long objectsCreatedCount() private void tryReturnToPool(T object, ObjectId objectId, Cleaners.Cleanable cleanable, ObjectLeakNotifier notifier) { long currentPoolSize; + notifier.except = null; do { currentPoolSize = poolSize.get(); if (currentPoolSize >= objectsCacheMaxCount) { @@ -310,14 +324,14 @@ private static class ObjectLeakNotifier implements Runnable final AtomicLong leakedObjectsCounter; final AtomicBoolean disabled = new AtomicBoolean(false); - private RuntimeException except; + private LeakedException except; ObjectLeakNotifier(StupidPool pool, boolean poisoned) { poolReference = new WeakReference<>(pool); leakedObjectsCounter = pool.leakedObjectsCounter; - except = poisoned ? new RE("Thread[%s]: drip drip", Thread.currentThread().getName()) : null; + except = poisoned ? new LeakedException(Thread.currentThread().getName()) : null; } @Override @@ -329,7 +343,12 @@ public void run() final StupidPool pool = poolReference.get(); log.warn("Not closed! Object leaked from %s. Allowing gc to prevent leak.", pool); if (except != null && pool != null) { - pool.capturedException.set(except); + CopyOnWriteArrayList exceptions = pool.capturedException.get(); + if (exceptions == null) { + pool.capturedException.compareAndSet(null, new CopyOnWriteArrayList<>()); + } + exceptions = pool.capturedException.get(); + exceptions.add(except); log.error(except, "notifier[%s], dumping stack trace from object checkout and poisoning pool", this); } } @@ -357,4 +376,25 @@ public void disable() private static class ObjectId { } + + /** + * This exception exists primarily to defer string interpolation to when getMessage is called instead of + * interpolating on constructor. While not a primary bottleneck, the string interpolation for poisoned stupid + * pools does show up in profiling so avoiding it is just good hygiene. + */ + private static class LeakedException extends RuntimeException + { + private final String threadName; + + public LeakedException(String threadName) + { + this.threadName = threadName; + } + + @Override + public String getMessage() + { + return StringUtils.format("Originally checked out by thread [%s]", threadName); + } + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java b/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java index b658335e6a9b..b5768fe9cd22 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/LimitTimeIntervalOperator.java @@ -19,16 +19,13 @@ package org.apache.druid.query.operator; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.query.QueryPlus; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; import org.joda.time.Interval; import javax.annotation.Nullable; import java.io.Closeable; -import java.util.List; public class LimitTimeIntervalOperator implements Operator { @@ -37,16 +34,11 @@ public class LimitTimeIntervalOperator implements Operator public LimitTimeIntervalOperator( Operator segmentOperator, - QueryPlus queryPlus + Interval interval ) { this.segmentOperator = segmentOperator; - - final List intervals = queryPlus.getQuery().getIntervals(); - if (intervals.size() != 1) { - throw new ISE("Can only handle a single interval, got[%s]", intervals); - } - interval = intervals.get(0); + this.interval = interval; } @Nullable @@ -61,11 +53,13 @@ public Closeable goOrContinue( @Override public Signal push(RowsAndColumns rac) { - final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac); - if (!Intervals.isEternity(interval)) { + if (Intervals.isEternity(interval)) { + return receiver.push(rac); + } else { + final RowsAndColumnsDecorator decor = RowsAndColumnsDecorator.fromRAC(rac); decor.limitTimeRange(interval); + return receiver.push(decor.toRowsAndColumns()); } - return receiver.push(decor.toRowsAndColumns()); } @Override diff --git a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java index d716524bb06c..b58aff80b4fb 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java +++ b/processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryRunnerFactory.java @@ -20,16 +20,25 @@ package org.apache.druid.query.operator; import com.google.common.base.Function; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; import org.apache.druid.query.QueryProcessingPool; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryRunnerFactory; import org.apache.druid.query.QueryToolChest; +import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import org.apache.druid.query.rowsandcols.concrete.FrameRowsAndColumns; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.column.RowSignature; +import org.joda.time.Interval; import javax.annotation.Nullable; +import java.util.List; public class WindowOperatorQueryQueryRunnerFactory implements QueryRunnerFactory { @@ -41,10 +50,21 @@ public QueryRunner createRunner(Segment segment) return (queryPlus, responseContext) -> new OperatorSequence(() -> { Operator op = new SegmentToRowsAndColumnsOperator(segment); - op = new LimitTimeIntervalOperator(op, queryPlus); + + final List intervals = queryPlus.getQuery().getIntervals(); + if (intervals.size() != 1) { + throw DruidException.defensive("Can only handle a single interval, got [%s]", intervals); + } + + final Interval interval = intervals.get(0); + if (!Intervals.isEternity(interval)) { + op = new LimitTimeIntervalOperator(op, interval); + } + for (OperatorFactory leaf : ((WindowOperatorQuery) queryPlus.getQuery()).getLeafOperators()) { op = leaf.wrap(op); } + return op; }); } @@ -58,20 +78,47 @@ public QueryRunner mergeRunners( // This merge is extremely naive, there is no ordering being imposed over the data, nor is there any attempt // to shrink the size of the data before pushing it across the wire. This code implementation is intended more // to make this work for tests and less to work in production. That's why the WindowOperatorQuery forces - // a super-secrete context parameter to be set to actually allow it to run a query that pushes all the way down + // a super-secret context parameter to be set to actually allow it to run a query that pushes all the way down // like this. When this gets fixed, we can remove that parameter. return (queryPlus, responseContext) -> Sequences.concat( Sequences.map( Sequences.simple(queryRunners), new Function, Sequence>() { + @SuppressWarnings("ConstantConditions") @Nullable @Override public Sequence apply( @Nullable QueryRunner input ) { - return input.run(queryPlus, responseContext); + return Sequences.map( + input.run(queryPlus, responseContext), + new Function() + { + @Nullable + @Override + public RowsAndColumns apply(@Nullable RowsAndColumns input) + { + // This is interim code to force a materialization by synthesizing the wire transfer + // that will need to naturally happen as we flesh out this code more. For now, we + // materialize the bytes on-heap and then read them back in as a frame. + if (input instanceof LazilyDecoratedRowsAndColumns) { + final WireTransferable wire = WireTransferable.fromRAC(input); + final byte[] frameBytes = wire.bytesToTransfer(); + + RowSignature.Builder sigBob = RowSignature.builder(); + for (String column : input.getColumnNames()) { + sigBob.add(column, input.findColumn(column).toAccessor().getType()); + } + + return new FrameRowsAndColumns(Frame.wrap(frameBytes), sigBob.build()); + } + return input; + } + } + ); + } } ) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index e6ef061d7150..bb2399acd055 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -29,6 +29,7 @@ import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.guava.Sequence; @@ -41,6 +42,7 @@ import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker; import org.apache.druid.query.rowsandcols.semantic.DefaultRowsAndColumnsDecorator; import org.apache.druid.query.rowsandcols.semantic.RowsAndColumnsDecorator; +import org.apache.druid.query.rowsandcols.semantic.WireTransferable; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.StorageAdapter; @@ -56,11 +58,16 @@ import java.util.Collections; import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; public class LazilyDecoratedRowsAndColumns implements RowsAndColumns { + private static final Map, Function> AS_MAP = + RowsAndColumns.makeAsMap(LazilyDecoratedRowsAndColumns.class); + private RowsAndColumns base; private Interval interval; private Filter filter; @@ -118,33 +125,48 @@ public Column findColumn(String name) @Override public T as(Class clazz) { - if (RowsAndColumnsDecorator.class.equals(clazz)) { - // If we don't have a projection defined, then it's safe to continue collecting more decorations as we - // can meaningfully merge them together. - if (viewableColumns == null || viewableColumns.isEmpty()) { - return (T) new DefaultRowsAndColumnsDecorator( - base, - interval, - filter, - virtualColumns, - limit, - ordering - ); + //noinspection ReturnOfNull + return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); + } + + @SemanticCreator + public RowsAndColumnsDecorator toRowsAndColumnsDecorator() + { + // If we don't have a projection defined, then it's safe to continue collecting more decorations as we + // can meaningfully merge them together. + if (viewableColumns == null || viewableColumns.isEmpty()) { + return new DefaultRowsAndColumnsDecorator(base, interval, filter, virtualColumns, limit, ordering); + } else { + return new DefaultRowsAndColumnsDecorator(this); + } + } + + @SemanticCreator + public WireTransferable toWireTransferable() + { + return () -> { + final Pair materialized = materialize(); + if (materialized == null) { + return new byte[]{}; } else { - return (T) new DefaultRowsAndColumnsDecorator(this); + return materialized.lhs; } - } - return null; + }; } private void maybeMaterialize() { if (!(interval == null && filter == null && limit == -1 && ordering == null)) { - materialize(); + final Pair thePair = materialize(); + if (thePair == null) { + reset(new EmptyRowsAndColumns()); + } else { + reset(new FrameRowsAndColumns(Frame.wrap(thePair.lhs), thePair.rhs)); + } } } - private void materialize() + private Pair materialize() { if (ordering != null) { throw new ISE("Cannot reorder[%s] scan data right now", ordering); @@ -152,13 +174,26 @@ private void materialize() final StorageAdapter as = base.as(StorageAdapter.class); if (as == null) { - reset(naiveMaterialize(base)); + return naiveMaterialize(base); } else { - reset(materializeStorageAdapter(as)); + return materializeStorageAdapter(as); } + } - private RowsAndColumns materializeStorageAdapter(StorageAdapter as) + private void reset(RowsAndColumns rac) + { + base = rac; + interval = null; + filter = null; + virtualColumns = null; + limit = -1; + viewableColumns = null; + ordering = null; + } + + @Nullable + private Pair materializeStorageAdapter(StorageAdapter as) { final Sequence cursors = as.makeCursors( filter, @@ -231,25 +266,15 @@ private RowsAndColumns materializeStorageAdapter(StorageAdapter as) // This means that the accumulate was never called, which can only happen if we didn't have any cursors. // We would only have zero cursors if we essentially didn't match anything, meaning that our RowsAndColumns // should be completely empty. - return new EmptyRowsAndColumns(); + return null; } else { final byte[] bytes = writer.toByteArray(); - return new FrameRowsAndColumns(Frame.wrap(bytes), siggy.get()); + return Pair.of(bytes, siggy.get()); } } - private void reset(RowsAndColumns rac) - { - base = rac; - interval = null; - filter = null; - virtualColumns = null; - limit = -1; - viewableColumns = null; - ordering = null; - } - - private RowsAndColumns naiveMaterialize(RowsAndColumns rac) + @Nullable + private Pair naiveMaterialize(RowsAndColumns rac) { final int numRows = rac.numRows(); @@ -264,7 +289,7 @@ private RowsAndColumns naiveMaterialize(RowsAndColumns rac) // long as is required by the time filter produces all 0s, so either 0 is included and matches all rows or // it's not and we skip all rows. if (!interval.contains(0)) { - return new EmptyRowsAndColumns(); + return null; } } else { final ColumnAccessor accessor = racColumn.toAccessor(); @@ -355,7 +380,6 @@ private RowsAndColumns naiveMaterialize(RowsAndColumns rac) frameWriter.addSelection(); } - return new FrameRowsAndColumns(Frame.wrap(frameWriter.toByteArray()), sigBob.build()); + return Pair.of(frameWriter.toByteArray(), sigBob.build()); } - } diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java index 7b6a1f6215d3..d139265d147d 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/RowsAndColumns.java @@ -19,13 +19,19 @@ package org.apache.druid.query.rowsandcols; +import org.apache.druid.error.DruidException; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Function; /** * An interface representing a chunk of RowsAndColumns. Essentially a RowsAndColumns is just a batch of rows @@ -69,6 +75,31 @@ static AppendableRowsAndColumns expectAppendable(RowsAndColumns input) return retVal; } + static Map, Function> makeAsMap(Class clazz) + { + Map, Function> retVal = new HashMap<>(); + + for (Method method : clazz.getMethods()) { + if (method.isAnnotationPresent(SemanticCreator.class)) { + if (method.getParameterCount() != 0) { + throw DruidException.defensive("Method [%s] annotated with SemanticCreator was not 0-argument.", method); + } + + retVal.put(method.getReturnType(), arg -> { + try { + return method.invoke(arg); + } + catch (InvocationTargetException | IllegalAccessException e) { + throw DruidException.defensive().build(e, "Problem invoking method [%s]", method); + } + }); + } + } + + return retVal; + } + + /** * The set of column names available from the RowsAndColumns * diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java new file mode 100644 index 000000000000..bb1af0e4d9f1 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/SemanticCreator.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.rowsandcols; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation used to indicate that the method is used as a creator for a semantic interface. + * + * Used in conjuction with {@link RowsAndColumns#makeAsMap(Class)} to build maps for simplified implementation of + * the {@link RowsAndColumns#as(Class)} method. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface SemanticCreator +{ +} diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java index e2dc8fa0a9d0..7c6038b38990 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/QueryableIndexRowsAndColumns.java @@ -19,6 +19,7 @@ package org.apache.druid.query.rowsandcols.concrete; +import org.apache.druid.error.DruidException; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; @@ -28,10 +29,12 @@ import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.column.ColumnHolder; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.util.Collection; import java.util.HashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -41,6 +44,7 @@ public class QueryableIndexRowsAndColumns implements RowsAndColumns, AutoCloseab private final QueryableIndex index; + private final AtomicBoolean closed = new AtomicBoolean(false); private final Closer closer = Closer.create(); private final AtomicInteger numRows = new AtomicInteger(-1); @@ -72,6 +76,9 @@ public int numRows() @Override public Column findColumn(String name) { + if (closed.get()) { + throw DruidException.defensive("Cannot be accessed after being closed!?"); + } final ColumnHolder columnHolder = index.getColumnHolder(name); if (columnHolder == null) { return null; @@ -83,15 +90,18 @@ public Column findColumn(String name) @SuppressWarnings("unchecked") @Nullable @Override - public T as(Class clazz) + public T as(@Nonnull Class clazz) { + //noinspection ReturnOfNull return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); } @Override public void close() throws IOException { - closer.close(); + if (closed.compareAndSet(false, true)) { + closer.close(); + } } private static HashMap, Function> makeAsMap() diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java index 45915dd9b5c6..6ce7c258ef17 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultNaiveSortMaker.java @@ -28,6 +28,7 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.ColumnAccessor; +import javax.annotation.Nullable; import java.util.ArrayList; public class DefaultNaiveSortMaker implements NaiveSortMaker @@ -61,6 +62,7 @@ public DefaultNaiveSorter(ArrayList ordering) } + @Nullable @Override public RowsAndColumns moreData(RowsAndColumns rac) { @@ -75,7 +77,12 @@ public RowsAndColumns complete() return new EmptyRowsAndColumns(); } - ConcatRowsAndColumns rac = new ConcatRowsAndColumns(racBuffer); + final RowsAndColumns rac; + if (racBuffer.size() == 1) { + rac = racBuffer.get(0); + } else { + rac = new ConcatRowsAndColumns(racBuffer); + } // One int for the racBuffer, another for the rowIndex int[] sortedPointers = new int[rac.numRows()]; diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index a04583a3b32d..6425e12def4d 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -62,6 +62,7 @@ /** * Helps tests make segments. */ +@SuppressWarnings({"NotNullFieldNotInitialized", "FieldMayBeFinal", "ConstantConditions", "NullableProblems"}) public class IndexBuilder { private static final int ROWS_PER_INDEX_FOR_MERGING = 1; @@ -261,7 +262,7 @@ public File buildMMappedIndexFile() tmpDir, schema.getDimensionsSpec(), indexSpec, - Integer.MAX_VALUE + -1 ); } catch (IOException e) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index 84f920878f15..762ae621d1fd 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -109,7 +109,6 @@ public DrillWindowQueryTest( this.filename = filename; } - @SuppressWarnings({"unchecked", "rawtypes"}) @Override public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( QueryRunnerFactoryConglomerate conglomerate, @@ -152,7 +151,13 @@ public SpecificSegmentsQuerySegmentWalker createQuerySegmentWalker( @Test public void windowQueryTest() { + Thread thread = null; + String oldName = null; try { + thread = Thread.currentThread(); + oldName = thread.getName(); + thread.setName("drillWindowQuery-" + filename); + final String query = getQueryString(); final String results = getExpectedResults(); @@ -160,14 +165,17 @@ public void windowQueryTest() .skipVectorize(true) .sql(query) .queryContext(ImmutableMap.of("windowsAreForClosers", true, "windowsAllTheWayDown", true)) - .expectedResults((sql, results1) -> { - Assert.assertEquals(results, results1); - }) + .expectedResults((sql, results1) -> Assert.assertEquals(results, String.valueOf(results1))) .run(); } catch (Throwable e) { log.info(e, "Got a throwable, here it is. Ignoring for now."); } + finally { + if (thread != null && oldName != null) { + thread.setName(oldName); + } + } } @Nonnull @@ -182,6 +190,7 @@ private String getExpectedResults() throws IOException return readStringFromResource(".e"); } + @SuppressWarnings({"UnstableApiUsage", "ConstantConditions"}) @Nonnull private String readStringFromResource(String s) throws IOException { @@ -192,6 +201,7 @@ private String readStringFromResource(String s) throws IOException return query; } + @SuppressWarnings({"rawtypes", "unchecked"}) private void attachIndex(SpecificSegmentsQuerySegmentWalker texasRanger, String dataSource, DimensionSchema... dims) throws IOException { From b925fc23651a5c7715a3e73533ae3e7f85514fbd Mon Sep 17 00:00:00 2001 From: imply-cheddar Date: Wed, 12 Jul 2023 16:44:51 +0900 Subject: [PATCH 2/2] Suppress unused warning --- .../druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java index bb2399acd055..4a62106a557c 100644 --- a/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java +++ b/processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java @@ -129,6 +129,7 @@ public T as(Class clazz) return (T) AS_MAP.getOrDefault(clazz, arg -> null).apply(this); } + @SuppressWarnings("unused") @SemanticCreator public RowsAndColumnsDecorator toRowsAndColumnsDecorator() { @@ -141,6 +142,7 @@ public RowsAndColumnsDecorator toRowsAndColumnsDecorator() } } + @SuppressWarnings("unused") @SemanticCreator public WireTransferable toWireTransferable() {