Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ int findWorkerForServerSelector(final ServerSelector serverSelector, final int m
return UNKNOWN;
}

final String serverHostAndPort = server.getServer().getHostAndPort();
final String serverHostAndPort = server.getServer().getHost();
final int workerNumber = workerIdToNumber.getInt(serverHostAndPort);

// The worker number may be UNKNOWN in a race condition, such as the set of Historicals changing while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,8 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
{
final StageDefinition stageDefinition = workOrder.getStageDefinition();
final List<OutputChannel> retVal = new ArrayList<>();
final List<KeyStatisticsCollectionProcessor> processors = new ArrayList<>();
final int numOutputChannels = channels.getAllChannels().size();
final List<KeyStatisticsCollectionProcessor> processors = new ArrayList<>(numOutputChannels);

for (final OutputChannel outputChannel : channels.getAllChannels()) {
final BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
Expand All @@ -1037,7 +1038,9 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
stageDefinition.getFrameReader(),
stageDefinition.getClusterBy(),
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
// Divide by two: half for the per-processor collectors together, half for the combined collector.
// Then divide by numOutputChannels: one portion per processor.
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() / 2 / numOutputChannels
)
)
);
Expand All @@ -1049,7 +1052,9 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
ProcessorManagers.of(processors)
.withAccumulation(
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
// Divide by two: half for the per-processor collectors, half for the
// combined collector.
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() / 2
),
ClusterByStatisticsCollector::addAll
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public static WorkerMemoryParameters createInstance(
frameSize,
superSorterConcurrentProcessors,
superSorterMaxChannelsPerMerger,
Math.min(Integer.MAX_VALUE, partitionStatsMemory / numProcessingThreads),
partitionStatsMemory,
hasBroadcastInputs ? computeBroadcastBufferMemory(bundleMemory) : 0
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Iterables;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.FrameWithPartition;
import org.apache.druid.frame.channel.ReadableFrameChannel;
Expand Down Expand Up @@ -64,7 +65,6 @@
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
Expand Down Expand Up @@ -312,13 +312,14 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
);
}

if (!Intervals.ONLY_ETERNITY.equals(query.getIntervals())) {
// runWithInputChannel is for running on subquery results, where we don't expect to see "intervals" set.
// The SQL planner avoid it for subqueries; see DruidQuery#canUseIntervalFiltering.
throw DruidException.defensive("Expected eternity intervals, but got[%s]", query.getIntervals());
}

final CursorHolder nextCursorHolder =
cursorFactory.makeCursorHolder(
ScanQueryEngine.makeCursorBuildSpec(
query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)),
null
)
);
cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null));
final Cursor nextCursor = nextCursorHolder.asCursor();

if (nextCursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ public void addAll(DistinctKeyCollector other)

if (retainedKeys.isEmpty()) {
this.spaceReductionFactor = other.spaceReductionFactor;
}

for (final Object2LongMap.Entry<RowKey> otherEntry : other.retainedKeys.object2LongEntrySet()) {
add(otherEntry.getKey(), otherEntry.getLongValue());
this.retainedKeys.putAll(other.retainedKeys);
this.maxBytes = other.maxBytes;
this.totalWeightUnadjusted = other.totalWeightUnadjusted;
} else {
for (final Object2LongMap.Entry<RowKey> otherEntry : other.retainedKeys.object2LongEntrySet()) {
add(otherEntry.getKey(), otherEntry.getLongValue());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public void addAll(QuantilesSketchKeyCollector other)
double otherBytesCount = other.averageKeyLength * other.getSketch().getN();
averageKeyLength = ((sketchBytesCount + otherBytesCount) / (sketch.getN() + other.sketch.getN()));

union.union(sketch);
if (!sketch.isEmpty()) {
union.union(sketch);
}
union.union(other.sketch);
sketch = union.getResultAndReset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest
* This makes tests deterministic.
*/
private static final List<DruidServerMetadata> SERVERS = ImmutableList.of(
new DruidServerMetadata("no", "localhost:1001", null, 1, ServerType.HISTORICAL, "__default", 2),
new DruidServerMetadata("no", "localhost:1002", null, 1, ServerType.HISTORICAL, "__default", 1),
new DruidServerMetadata("no", "localhost:1001", null, 1, ServerType.HISTORICAL, "__default", 2), // plaintext
new DruidServerMetadata("no", null, "localhost:1002", 1, ServerType.HISTORICAL, "__default", 1), // TLS
new DruidServerMetadata("no", "localhost:1003", null, 1, ServerType.REALTIME, "__default", 0)
);

Expand All @@ -86,7 +86,7 @@ public class DartTableInputSpecSlicerTest extends InitializedNullHandlingTest
*/
private static final List<String> WORKER_IDS =
SERVERS.stream()
.map(server -> new WorkerId("http", server.getHostAndPort(), QUERY_ID).toString())
.map(server -> new WorkerId("http", server.getHost(), QUERY_ID).toString())
.collect(Collectors.toList());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void test_1WorkerInJvm_alone_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(892_000_000, frameSize, 4, 199, 22_300_000, 0),
new WorkerMemoryParameters(892_000_000, frameSize, 4, 199, 89_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -111,7 +111,7 @@ public void test_1WorkerInJvm_alone_withBroadcast_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(592_000_000, frameSize, 4, 132, 14_800_000, 200_000_000),
new WorkerMemoryParameters(592_000_000, frameSize, 4, 132, 59_200_000, 200_000_000),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void test_1WorkerInJvm_alone_2ConcurrentStages_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(392_000_000, frameSize, 4, 87, 9_800_000, 0),
new WorkerMemoryParameters(392_000_000, frameSize, 4, 87, 39_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -162,7 +162,7 @@ public void test_1WorkerInJvm_alone_2ConcurrentStages_4Threads_highHeap()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(2_392_000_000L, frameSize, 4, 537, 59_800_000, 0),
new WorkerMemoryParameters(2_392_000_000L, frameSize, 4, 537, 239_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -179,7 +179,7 @@ public void test_1WorkerInJvm_alone_32Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(136_000_000, frameSize, 32, 2, 425_000, 0),
new WorkerMemoryParameters(136_000_000, frameSize, 32, 2, 13_600_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -196,7 +196,7 @@ public void test_1WorkerInJvm_alone_33Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(109_000_000, frameSize, 32, 2, 330_303, 0),
new WorkerMemoryParameters(109_000_000, frameSize, 32, 2, 10_900_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void test_1WorkerInJvm_alone_40Threads_memoryFromError()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -325,7 +325,7 @@ public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages_memoryFromError(
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -342,7 +342,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_096_000_000, frameSize, 4, 245, 27_400_000, 0),
new WorkerMemoryParameters(1_096_000_000, frameSize, 4, 245, 109_600_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -359,7 +359,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_4Threads_2OutputPartitions()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_548_000_000, frameSize, 4, 347, 38_700_000, 0),
new WorkerMemoryParameters(1_548_000_000, frameSize, 4, 347, 154_800_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -376,7 +376,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_2ConcurrentStages_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(96_000_000, frameSize, 4, 20, 2_500_000, 0),
new WorkerMemoryParameters(96_000_000, frameSize, 4, 20, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -393,7 +393,7 @@ public void test_12WorkersInJvm_200WorkersInCluster_64Threads_4OutputPartitions(
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_762_666_666, frameSize, 64, 23, 2_754_166, 0),
new WorkerMemoryParameters(1_762_666_666, frameSize, 64, 23, 176_266_666, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -410,7 +410,7 @@ public void test_12WorkersInJvm_200WorkersInCluster_2ConcurrentStages_64Threads_
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(429_333_333, frameSize, 64, 5, 670_833, 0),
new WorkerMemoryParameters(429_333_333, frameSize, 64, 5, 42_933_333, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -428,7 +428,7 @@ public void test_1WorkerInJvm_MaxWorkersInCluster_2ConcurrentStages_2Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(448_000_000, frameSize, 2, 200, 22_400_000, 0),
new WorkerMemoryParameters(448_000_000, frameSize, 2, 200, 44_800_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory;
import org.apache.druid.timeline.SegmentId;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -177,7 +180,7 @@ public void test_runWithInputChannel() throws Exception
}
}

// put funny intervals on query to ensure it is adjusted to the segment interval before building cursor
// put funny intervals on query to ensure it is validated before building cursor
final ScanQuery query =
Druids.newScanQueryBuilder()
.dataSource("test")
Expand Down Expand Up @@ -240,11 +243,16 @@ public void close()
FrameReader.create(signature)
);

FrameTestUtil.assertRowsEqual(
FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, false),
rowsFromProcessor
final RuntimeException e = Assert.assertThrows(
RuntimeException.class,
rowsFromProcessor::toList
);

Assert.assertEquals(Unit.instance(), retVal.get());
MatcherAssert.assertThat(
e,
ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString(
"Expected eternity intervals, but got[[2001-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z, "
+ "2011-01-02T00:00:00.000Z/2021-01-01T00:00:00.000Z]]"))
);
}
}
2 changes: 1 addition & 1 deletion licenses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2953,7 +2953,7 @@ name: Apache Avro
license_category: binary
module: extensions/druid-avro-extensions
license_name: Apache License version 2.0
version: 1.11.3
version: 1.11.4
libraries:
- org.apache.avro: avro
- org.apache.avro: avro-mapred
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
<gson.version>2.10.1</gson.version>
<scala.library.version>2.13.14</scala.library.version>
<avatica.version>1.25.0</avatica.version>
<avro.version>1.11.3</avro.version>
<avro.version>1.11.4</avro.version>
<!--
The base calcite parser was copied into the project; when updating Calcite run dev/upgrade-calcite-parser to adopt upstream changes
-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ private void runWorkersIfPossible()
@GuardedBy("runWorkersLock")
private void setAllDoneIfPossible()
{
if (isAllDone()) {
// Already done, no need to set allDone again.
return;
}

try {
if (totalInputFrames == 0 && outputPartitionsFuture.isDone()) {
// No input data -- generate empty output channels.
Expand Down
Loading