From 5138e5179b4f5415393a1489147d3e24d5ea6583 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Wed, 19 Mar 2025 13:10:16 +0000 Subject: [PATCH 1/3] updates --- .../druid/msq/test/CalciteMSQTestsHelper.java | 52 ++++++++++++++++++- .../org.apache.druid.quidem.QTest/dart1.iq | 41 ++------------- .../SpecificSegmentsQuerySegmentWalker.java | 39 +++++++++++--- .../sql/calcite/util/SqlTestFramework.java | 1 - 4 files changed, 87 insertions(+), 46 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 6a94124e7682..aa835dc6690f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -30,6 +30,8 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; +import org.apache.druid.client.TimelineServerView; +import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; import org.apache.druid.data.input.ResourceInputSource; @@ -45,6 +47,7 @@ import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.initialization.DruidModule; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; @@ -58,6 +61,7 @@ import org.apache.druid.query.ForwardingQueryProcessingPool; import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryProcessingPool; +import org.apache.druid.query.TableDataSource; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; @@ -73,6 +77,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexCursorFactory; +import org.apache.druid.segment.ReferenceCountingSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.incremental.IncrementalIndex; @@ -84,6 +89,8 @@ import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; +import org.apache.druid.server.TestClusterQuerySegmentWalker.TestSegmentsBroker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest; @@ -92,7 +99,14 @@ import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.SegmentIdTest; +import org.apache.druid.timeline.TimelineLookup; +import org.apache.druid.timeline.TimelineObjectHolder; +import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.LinearShardSpec; +import org.apache.druid.timeline.partition.PartitionChunk; +import org.apache.druid.timeline.partition.PartitionHolder; +import org.apache.druid.timeline.partition.SingleElementPartitionChunk; import org.easymock.EasyMock; import org.joda.time.Interval; import org.mockito.Mockito; @@ -202,18 +216,52 @@ static class LocalDataSegmentProvider extends CacheLoader cache; + private SpecificSegmentsQuerySegmentWalker walker; + private TestSegmentsBroker testSegmentsBroker; @Inject - public LocalDataSegmentProvider(TempDirProducer tempDirProducer) + public LocalDataSegmentProvider(TempDirProducer tempDirProducer, SpecificSegmentsQuerySegmentWalker walker, + TestSegmentsBroker testSegmentsBroker, + TimelineServerView timelineServerView) { this.tempDirProducer = tempDirProducer; + this.walker = walker; + this.testSegmentsBroker = testSegmentsBroker; + TimelineLookup a = timelineServerView.getTimeline(new TableDataSource("rollup-tutorial")).get(); + List> s = a.lookup(Intervals.ETERNITY); + TimelineObjectHolder aa = s.get(0); + PartitionHolder ph = aa.getObject(); + PartitionChunk c = ph.getChunk(0); + + SingleElementPartitionChunk aas=(SingleElementPartitionChunk) c; + ServerSelector o = aas.getObject(); + DataSegment seg = o.getSegment(); + + extracted(); + + +// VersionedIntervalTimeline d1 = testSegmentsBrokerm.timelines.get("rollup-tutorial"); + + + CompleteSegment segment = walker.getSegment("rollup-tutorial"); this.cache = CacheBuilder.newBuilder().build(this); } + private Segment extracted() + { + VersionedIntervalTimeline versionedIntervalTimeline = testSegmentsBroker.timelines.get("rollup-tutorial"); + TimelineObjectHolder first = versionedIntervalTimeline.first(); + PartitionHolder object = first.getObject(); + PartitionChunk chunk = object.getChunk(0); + ReferenceCountingSegment object2 = chunk.getObject(); + return object2; + } + @Override public CompleteSegment load(SegmentId segmentId) throws Exception { - return getSupplierForSegment(tempDirProducer::newTempFolder, segmentId); + return walker.getSegment(segmentId.getDataSource()); +// return getSupplierForSegment(tempDirProducer::newTempFolder, segmentId); } @Override diff --git a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq index b8a8415e4f88..65dbce0f087b 100644 --- a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq +++ b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq @@ -1,46 +1,13 @@ !set dartQueryId 00000000-0000-0000-0000-000000000000 -!use druidtest://?componentSupplier=DartComponentSupplier +!use druidtest://?componentSupplier=StandardMSQComponentSupplier&datasets=sql/src/test/quidem/sampledataset !set outputformat mysql -select * from numfoo; -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -| __time | dim1 | dim2 | dim3 | dim4 | dim5 | dim6 | dbl1 | dbl2 | f1 | f2 | l1 | l2 | cnt | m1 | m2 | unique_dim1 | -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -| 2000-01-01 00:00:00.000 | | a | ["a","b"] | a | aa | 1 | 1.0 | | 1.0 | | 7 | | 1 | 1.0 | 1.0 | "AQAAAEAAAA==" | -| 2000-01-02 00:00:00.000 | 10.1 | | ["b","c"] | a | ab | 2 | 1.7 | 1.7 | 0.1 | 0.1 | 325323 | 325323 | 1 | 2.0 | 2.0 | "AQAAAQAAAAHNBA==" | -| 2001-01-01 00:00:00.000 | 1 | a | | b | ad | 4 | | | | | | | 1 | 4.0 | 4.0 | "AQAAAQAAAAFREA==" | -| 2001-01-02 00:00:00.000 | def | abc | | b | aa | 5 | | | | | | | 1 | 5.0 | 5.0 | "AQAAAQAAAACyEA==" | -| 2001-01-03 00:00:00.000 | abc | | | b | ab | 6 | | | | | | | 1 | 6.0 | 6.0 | "AQAAAQAAAAEkAQ==" | -| 2000-01-03 00:00:00.000 | 2 | | d | a | ba | 3 | 0.0 | 0.0 | 0.0 | 0.0 | 0 | 0 | 1 | 3.0 | 3.0 | "AQAAAQAAAAOzAg==" | -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -(6 rows) - -!ok -select * from numfoo; -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -| __time | dim1 | dim2 | dim3 | dim4 | dim5 | dim6 | dbl1 | dbl2 | f1 | f2 | l1 | l2 | cnt | m1 | m2 | unique_dim1 | -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -| 2000-01-01 00:00:00.000 | | a | ["a","b"] | a | aa | 1 | 1.0 | | 1.0 | | 7 | | 1 | 1.0 | 1.0 | "AQAAAEAAAA==" | -| 2000-01-02 00:00:00.000 | 10.1 | | ["b","c"] | a | ab | 2 | 1.7 | 1.7 | 0.1 | 0.1 | 325323 | 325323 | 1 | 2.0 | 2.0 | "AQAAAQAAAAHNBA==" | -| 2000-01-03 00:00:00.000 | 2 | | d | a | ba | 3 | 0.0 | 0.0 | 0.0 | 0.0 | 0 | 0 | 1 | 3.0 | 3.0 | "AQAAAQAAAAOzAg==" | -| 2001-01-01 00:00:00.000 | 1 | a | | b | ad | 4 | | | | | | | 1 | 4.0 | 4.0 | "AQAAAQAAAAFREA==" | -| 2001-01-02 00:00:00.000 | def | abc | | b | aa | 5 | | | | | | | 1 | 5.0 | 5.0 | "AQAAAQAAAACyEA==" | -| 2001-01-03 00:00:00.000 | abc | | | b | ab | 6 | | | | | | | 1 | 6.0 | 6.0 | "AQAAAQAAAAEkAQ==" | -+-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ -(6 rows) - -!ok -select length(dim1) from numfoo; +select count(1) from "rollup-tutorial"; +--------+ | EXPR$0 | +--------+ -| 0 | -| 1 | -| 1 | -| 3 | -| 3 | -| 4 | +| 9 | +--------+ -(6 rows) +(1 row) !ok diff --git a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java index ff385e99f641..0dd9ffab4eb2 100644 --- a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java @@ -23,6 +23,8 @@ import com.google.common.collect.Ordering; import com.google.common.io.Closeables; import com.google.inject.Injector; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; import org.apache.druid.query.DataSource; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.InlineDataSource; @@ -34,6 +36,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.FrameBasedInlineSegmentWrangler; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.InlineSegmentWrangler; @@ -76,8 +79,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C { private final QuerySegmentWalker walker; private final Map> timelines; - private final List closeables = new ArrayList<>(); - private final List segments = new ArrayList<>(); + private final List segments = new ArrayList<>(); private static final LookupExtractorFactoryContainerProvider LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER = new LookupExtractorFactoryContainerProvider() { @@ -185,6 +187,14 @@ public SpecificSegmentsQuerySegmentWalker( public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, final Segment segment) { + return add(new CompleteSegment(descriptor, segment)); + } + + private SpecificSegmentsQuerySegmentWalker add(CompleteSegment completeSegment) + { + DataSegment descriptor = completeSegment.getDataSegment(); + Segment segment = completeSegment.getSegment(); + final ReferenceCountingSegment referenceCountingSegment = ReferenceCountingSegment.wrapSegment( segment, @@ -199,8 +209,7 @@ public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, fina descriptor.getVersion(), descriptor.getShardSpec().createChunk(referenceCountingSegment) ); - segments.add(descriptor); - closeables.add(referenceCountingSegment); + segments.add(completeSegment); return this; } @@ -216,7 +225,7 @@ public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, fina public List getSegments() { - return segments; + return Lists.transform(segments, CompleteSegment::getDataSegment); } @Override @@ -234,7 +243,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final @Override public void close() throws IOException { - for (Closeable closeable : closeables) { + for (Closeable closeable : segments) { Closeables.close(closeable, true); } } @@ -253,4 +262,22 @@ public SpecificSegmentsQuerySegmentWalker add(TestDataSet dataset, File tmpDir) indexNumericDims ); } + + public CompleteSegment getSegment(String dataSourceName) + { + List matches = new ArrayList<>(1); + for (CompleteSegment s : segments) { + String dataSource = s.getDataSegment().getDataSource(); + if (dataSource.equals(dataSourceName)) { + matches.add(s); + } + } + if (matches.size() != 1) { + throw DruidException.defensive( + "Datasource [%s] has either no segments or more than one - neither is supported right now [%s].", dataSourceName, + matches + ); + } + return matches.get(0); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index d88af75df644..e99550d08c15 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -963,7 +963,6 @@ private SystemSchema makeSystemSchema(AuthorizerMapper authorizerMapper, DruidSc return CalciteTests.createMockSystemSchema(druidSchema, timelineServerView, authorizerMapper); } - @Provides @LazySingleton private TimelineServerView makeTimelineServerView(SpecificSegmentsQuerySegmentWalker walker) From f38091823ada8eef35a89669682b3ffedf836322 Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Wed, 19 Mar 2025 13:16:52 +0000 Subject: [PATCH 2/3] cleanup --- .../druid/msq/test/CalciteMSQTestsHelper.java | 371 +----------------- .../org.apache.druid.quidem.QTest/dart1.iq | 41 +- .../dart_with_datasets.iq | 13 + .../SpecificSegmentsQuerySegmentWalker.java | 2 +- 4 files changed, 58 insertions(+), 369 deletions(-) create mode 100644 quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index aa835dc6690f..260718be8d32 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -20,9 +20,6 @@ package org.apache.druid.msq.test; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Binder; @@ -30,14 +27,8 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.TypeLiteral; -import org.apache.druid.client.TimelineServerView; -import org.apache.druid.client.selector.ServerSelector; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.data.input.ResourceInputSource; -import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.discovery.NodeRole; import org.apache.druid.frame.processor.Bouncer; import org.apache.druid.guice.IndexingServiceTuningConfigModule; @@ -46,9 +37,6 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; @@ -59,78 +47,33 @@ import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; -import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryProcessingPool; -import org.apache.druid.query.TableDataSource; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; -import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.segment.CompleteSegment; -import org.apache.druid.segment.CursorFactory; -import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.QueryableIndexCursorFactory; -import org.apache.druid.segment.ReferenceCountingSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; -import org.apache.druid.segment.incremental.IncrementalIndex; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.SegmentManager; import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; -import org.apache.druid.server.TestClusterQuerySegmentWalker.TestSegmentsBroker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; -import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest; import org.apache.druid.sql.calcite.TempDirProducer; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.sql.calcite.util.TestDataBuilder; -import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.SegmentIdTest; -import org.apache.druid.timeline.TimelineLookup; -import org.apache.druid.timeline.TimelineObjectHolder; -import org.apache.druid.timeline.VersionedIntervalTimeline; -import org.apache.druid.timeline.partition.LinearShardSpec; -import org.apache.druid.timeline.partition.PartitionChunk; -import org.apache.druid.timeline.partition.PartitionHolder; -import org.apache.druid.timeline.partition.SingleElementPartitionChunk; import org.easymock.EasyMock; -import org.joda.time.Interval; import org.mockito.Mockito; -import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.druid.sql.calcite.util.CalciteTests.ARRAYS_DATASOURCE; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; -import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -212,63 +155,23 @@ public DataSegmentProvider provideDataSegmentProvider(LocalDataSegmentProvider l } @LazySingleton - static class LocalDataSegmentProvider extends CacheLoader implements DataSegmentProvider + static class LocalDataSegmentProvider implements DataSegmentProvider { - private TempDirProducer tempDirProducer; - private LoadingCache cache; private SpecificSegmentsQuerySegmentWalker walker; - private TestSegmentsBroker testSegmentsBroker; @Inject - public LocalDataSegmentProvider(TempDirProducer tempDirProducer, SpecificSegmentsQuerySegmentWalker walker, - TestSegmentsBroker testSegmentsBroker, - TimelineServerView timelineServerView) + public LocalDataSegmentProvider(SpecificSegmentsQuerySegmentWalker walker) { - this.tempDirProducer = tempDirProducer; this.walker = walker; - this.testSegmentsBroker = testSegmentsBroker; - TimelineLookup a = timelineServerView.getTimeline(new TableDataSource("rollup-tutorial")).get(); - List> s = a.lookup(Intervals.ETERNITY); - TimelineObjectHolder aa = s.get(0); - PartitionHolder ph = aa.getObject(); - PartitionChunk c = ph.getChunk(0); - - SingleElementPartitionChunk aas=(SingleElementPartitionChunk) c; - ServerSelector o = aas.getObject(); - DataSegment seg = o.getSegment(); - - extracted(); - - -// VersionedIntervalTimeline d1 = testSegmentsBrokerm.timelines.get("rollup-tutorial"); - - - CompleteSegment segment = walker.getSegment("rollup-tutorial"); - this.cache = CacheBuilder.newBuilder().build(this); - } - - private Segment extracted() - { - VersionedIntervalTimeline versionedIntervalTimeline = testSegmentsBroker.timelines.get("rollup-tutorial"); - TimelineObjectHolder first = versionedIntervalTimeline.first(); - PartitionHolder object = first.getObject(); - PartitionChunk chunk = object.getChunk(0); - ReferenceCountingSegment object2 = chunk.getObject(); - return object2; - } - - @Override - public CompleteSegment load(SegmentId segmentId) throws Exception - { - return walker.getSegment(segmentId.getDataSource()); -// return getSupplierForSegment(tempDirProducer::newTempFolder, segmentId); } @Override - public Supplier> fetchSegment(SegmentId segmentId, - ChannelCounters channelCounters, boolean isReindex) + public Supplier> fetchSegment( + SegmentId segmentId, + ChannelCounters channelCounters, + boolean isReindex) { - CompleteSegment a = cache.getUnchecked(segmentId); + CompleteSegment a = walker.getSegment(segmentId.getDataSource()); return () -> new ReferenceCountingResourceHolder<>(a, Closer.create()); } @@ -320,264 +223,4 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor doReturn(dataServerQueryHandler).when(mockFactory).createDataServerQueryHandler(anyString(), any(), any()); return mockFactory; } - - protected static CompleteSegment getSupplierForSegment( - Function tempFolderProducer, - SegmentId segmentId - ) - { - final QueryableIndex index; - switch (segmentId.getDataSource()) { - case WIKIPEDIA: - try { - final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID())); - final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex(); - TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null); - index = TestIndex.INDEX_IO.loadIndex(directory); - } - catch (Exception e) { - throw new RuntimeException(e); - } - break; - case DATASOURCE1: - IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt"), - new FloatSumAggregatorFactory("m1", "m1"), - new DoubleSumAggregatorFactory("m2", "m2"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - ) - .withRollup(false) - .build(); - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(foo1Schema) - .rows(ROWS1) - .buildMMappedIndex(); - break; - case DATASOURCE2: - final IncrementalIndexSchema indexSchemaDifferentDim3M1Types = new IncrementalIndexSchema.Builder() - .withDimensionsSpec( - new DimensionsSpec( - ImmutableList.of( - new StringDimensionSchema("dim1"), - new StringDimensionSchema("dim2"), - new LongDimensionSchema("dim3") - ) - ) - ) - .withMetrics( - new CountAggregatorFactory("cnt"), - new LongSumAggregatorFactory("m1", "m1"), - new DoubleSumAggregatorFactory("m2", "m2"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - ) - .withRollup(false) - .build(); - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(indexSchemaDifferentDim3M1Types) - .rows(ROWS2) - .buildMMappedIndex(); - break; - case DATASOURCE3: - case CalciteTests.BROADCAST_DATASOURCE: - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(INDEX_SCHEMA_NUMERIC_DIMS) - .rows(ROWS1_WITH_NUMERIC_DIMS) - .buildMMappedIndex(); - break; - case DATASOURCE5: - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(INDEX_SCHEMA_LOTS_O_COLUMNS) - .rows(ROWS_LOTS_OF_COLUMNS) - .buildMMappedIndex(); - break; - case ARRAYS_DATASOURCE: - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) - .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .build() - ) - .inputSource( - ResourceInputSource.of( - NestedDataTestUtils.class.getClassLoader(), - NestedDataTestUtils.ARRAY_TYPES_DATA_FILE - ) - ) - .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) - .inputTmpDir(tempFolderProducer.apply("tmpDir")) - .buildMMappedIndex(); - break; - case CalciteNestedDataQueryTest.DATA_SOURCE: - case CalciteNestedDataQueryTest.DATA_SOURCE_MIXED: - if (segmentId.getPartitionNum() == 0) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.ALL_JSON_COLUMNS.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS) - .buildMMappedIndex(); - } else if (segmentId.getPartitionNum() == 1) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.JSON_AND_SCALAR_MIX.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS_MIX) - .buildMMappedIndex(); - } else { - throw new ISE("Cannot query segment %s in test runner", segmentId); - } - break; - case CalciteNestedDataQueryTest.DATA_SOURCE_MIXED_2: - if (segmentId.getPartitionNum() == 0) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.JSON_AND_SCALAR_MIX.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS_MIX) - .buildMMappedIndex(); - } else if (segmentId.getPartitionNum() == 1) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.ALL_JSON_COLUMNS.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS) - .buildMMappedIndex(); - } else { - throw new ISE("Cannot query segment %s in test runner", segmentId); - } - break; - case CalciteNestedDataQueryTest.DATA_SOURCE_ALL: - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) - .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .build() - ) - .inputSource( - ResourceInputSource.of( - NestedDataTestUtils.class.getClassLoader(), - NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE - ) - ) - .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) - .inputTmpDir(tempFolderProducer.apply("tmpDir")) - .buildMMappedIndex(); - break; - case CalciteTests.WIKIPEDIA_FIRST_LAST: - index = TestDataBuilder.makeWikipediaIndexWithAggregation(tempFolderProducer.apply("tmpDir")); - break; - case CalciteTests.TBL_WITH_NULLS_PARQUET: - case CalciteTests.SML_TBL_PARQUET: - case CalciteTests.ALL_TYPES_UNIQ_PARQUET: - case CalciteTests.FEW_ROWS_ALL_DATA_PARQUET: - case CalciteTests.T_ALL_TYPE_PARQUET: - index = TestDataBuilder.getQueryableIndexForDrillDatasource(segmentId.getDataSource(), tempFolderProducer.apply("tmpDir")); - break; - case CalciteTests.BENCHMARK_DATASOURCE: - index = TestDataBuilder.getQueryableIndexForBenchmarkDatasource(); - break; - default: - throw new ISE("Cannot query segment %s in test runner", segmentId); - - } - Segment segment = new Segment() - { - @Override - public SegmentId getId() - { - return segmentId; - } - - @Override - public Interval getDataInterval() - { - return segmentId.getInterval(); - } - - @Nullable - @Override - public QueryableIndex asQueryableIndex() - { - return index; - } - - @Override - public CursorFactory asCursorFactory() - { - return new QueryableIndexCursorFactory(index); - } - - @Override - public void close() - { - } - }; - DataSegment dataSegment = DataSegment.builder() - .dataSource(segmentId.getDataSource()) - .interval(segmentId.getInterval()) - .version(segmentId.getVersion()) - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(); - return new CompleteSegment(dataSegment, segment); - } } diff --git a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq index 65dbce0f087b..b8a8415e4f88 100644 --- a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq +++ b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart1.iq @@ -1,13 +1,46 @@ !set dartQueryId 00000000-0000-0000-0000-000000000000 -!use druidtest://?componentSupplier=StandardMSQComponentSupplier&datasets=sql/src/test/quidem/sampledataset +!use druidtest://?componentSupplier=DartComponentSupplier !set outputformat mysql -select count(1) from "rollup-tutorial"; +select * from numfoo; ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +| __time | dim1 | dim2 | dim3 | dim4 | dim5 | dim6 | dbl1 | dbl2 | f1 | f2 | l1 | l2 | cnt | m1 | m2 | unique_dim1 | ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +| 2000-01-01 00:00:00.000 | | a | ["a","b"] | a | aa | 1 | 1.0 | | 1.0 | | 7 | | 1 | 1.0 | 1.0 | "AQAAAEAAAA==" | +| 2000-01-02 00:00:00.000 | 10.1 | | ["b","c"] | a | ab | 2 | 1.7 | 1.7 | 0.1 | 0.1 | 325323 | 325323 | 1 | 2.0 | 2.0 | "AQAAAQAAAAHNBA==" | +| 2001-01-01 00:00:00.000 | 1 | a | | b | ad | 4 | | | | | | | 1 | 4.0 | 4.0 | "AQAAAQAAAAFREA==" | +| 2001-01-02 00:00:00.000 | def | abc | | b | aa | 5 | | | | | | | 1 | 5.0 | 5.0 | "AQAAAQAAAACyEA==" | +| 2001-01-03 00:00:00.000 | abc | | | b | ab | 6 | | | | | | | 1 | 6.0 | 6.0 | "AQAAAQAAAAEkAQ==" | +| 2000-01-03 00:00:00.000 | 2 | | d | a | ba | 3 | 0.0 | 0.0 | 0.0 | 0.0 | 0 | 0 | 1 | 3.0 | 3.0 | "AQAAAQAAAAOzAg==" | ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +(6 rows) + +!ok +select * from numfoo; ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +| __time | dim1 | dim2 | dim3 | dim4 | dim5 | dim6 | dbl1 | dbl2 | f1 | f2 | l1 | l2 | cnt | m1 | m2 | unique_dim1 | ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +| 2000-01-01 00:00:00.000 | | a | ["a","b"] | a | aa | 1 | 1.0 | | 1.0 | | 7 | | 1 | 1.0 | 1.0 | "AQAAAEAAAA==" | +| 2000-01-02 00:00:00.000 | 10.1 | | ["b","c"] | a | ab | 2 | 1.7 | 1.7 | 0.1 | 0.1 | 325323 | 325323 | 1 | 2.0 | 2.0 | "AQAAAQAAAAHNBA==" | +| 2000-01-03 00:00:00.000 | 2 | | d | a | ba | 3 | 0.0 | 0.0 | 0.0 | 0.0 | 0 | 0 | 1 | 3.0 | 3.0 | "AQAAAQAAAAOzAg==" | +| 2001-01-01 00:00:00.000 | 1 | a | | b | ad | 4 | | | | | | | 1 | 4.0 | 4.0 | "AQAAAQAAAAFREA==" | +| 2001-01-02 00:00:00.000 | def | abc | | b | aa | 5 | | | | | | | 1 | 5.0 | 5.0 | "AQAAAQAAAACyEA==" | +| 2001-01-03 00:00:00.000 | abc | | | b | ab | 6 | | | | | | | 1 | 6.0 | 6.0 | "AQAAAQAAAAEkAQ==" | ++-------------------------+------+------+-----------+------+------+------+------+------+-----+-----+--------+--------+-----+-----+-----+--------------------+ +(6 rows) + +!ok +select length(dim1) from numfoo; +--------+ | EXPR$0 | +--------+ -| 9 | +| 0 | +| 1 | +| 1 | +| 3 | +| 3 | +| 4 | +--------+ -(1 row) +(6 rows) !ok diff --git a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq new file mode 100644 index 000000000000..65dbce0f087b --- /dev/null +++ b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq @@ -0,0 +1,13 @@ +!set dartQueryId 00000000-0000-0000-0000-000000000000 +!use druidtest://?componentSupplier=StandardMSQComponentSupplier&datasets=sql/src/test/quidem/sampledataset +!set outputformat mysql + +select count(1) from "rollup-tutorial"; ++--------+ +| EXPR$0 | ++--------+ +| 9 | ++--------+ +(1 row) + +!ok diff --git a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java index 0dd9ffab4eb2..a9960ba8a31d 100644 --- a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java @@ -190,7 +190,7 @@ public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, fina return add(new CompleteSegment(descriptor, segment)); } - private SpecificSegmentsQuerySegmentWalker add(CompleteSegment completeSegment) + public SpecificSegmentsQuerySegmentWalker add(CompleteSegment completeSegment) { DataSegment descriptor = completeSegment.getDataSegment(); Segment segment = completeSegment.getSegment(); From 5428225986831c7f1795611ff2eef719dbed798f Mon Sep 17 00:00:00 2001 From: Zoltan Haindrich Date: Wed, 19 Mar 2025 14:14:59 +0000 Subject: [PATCH 3/3] compare by segmentId --- .../apache/druid/msq/test/CalciteMSQTestsHelper.java | 2 +- .../server/SpecificSegmentsQuerySegmentWalker.java | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 260718be8d32..58f5a7b30624 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -171,7 +171,7 @@ public Supplier> fetchSegment( ChannelCounters channelCounters, boolean isReindex) { - CompleteSegment a = walker.getSegment(segmentId.getDataSource()); + CompleteSegment a = walker.getSegment(segmentId); return () -> new ReferenceCountingResourceHolder<>(a, Closer.create()); } diff --git a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java index a9960ba8a31d..7cc21ecd083c 100644 --- a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java @@ -54,6 +54,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.sql.calcite.util.datasets.TestDataSet; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.LinearShardSpec; import org.joda.time.Interval; @@ -263,18 +264,19 @@ public SpecificSegmentsQuerySegmentWalker add(TestDataSet dataset, File tmpDir) ); } - public CompleteSegment getSegment(String dataSourceName) + public CompleteSegment getSegment(SegmentId segmentId) { List matches = new ArrayList<>(1); for (CompleteSegment s : segments) { - String dataSource = s.getDataSegment().getDataSource(); - if (dataSource.equals(dataSourceName)) { + SegmentId id = s.getDataSegment().getId(); + if (id.equals(segmentId)) { matches.add(s); } } if (matches.size() != 1) { throw DruidException.defensive( - "Datasource [%s] has either no segments or more than one - neither is supported right now [%s].", dataSourceName, + "SegmentId [%s] has unexpected number of matches! [%s]", + segmentId, matches ); }