diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 6a94124e7682..58f5a7b30624 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -20,9 +20,6 @@ package org.apache.druid.msq.test; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.inject.Binder; @@ -32,10 +29,6 @@ import com.google.inject.TypeLiteral; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.collections.ResourceHolder; -import org.apache.druid.data.input.ResourceInputSource; -import org.apache.druid.data.input.impl.DimensionsSpec; -import org.apache.druid.data.input.impl.LongDimensionSchema; -import org.apache.druid.data.input.impl.StringDimensionSchema; import org.apache.druid.discovery.NodeRole; import org.apache.druid.frame.processor.Bouncer; import org.apache.druid.guice.IndexingServiceTuningConfigModule; @@ -44,8 +37,6 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.initialization.DruidModule; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; @@ -56,67 +47,33 @@ import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; -import org.apache.druid.query.NestedDataTestUtils; import org.apache.druid.query.QueryProcessingPool; -import org.apache.druid.query.aggregation.CountAggregatorFactory; -import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; -import org.apache.druid.query.aggregation.FloatSumAggregatorFactory; -import org.apache.druid.query.aggregation.LongSumAggregatorFactory; -import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.groupby.GroupByQueryConfig; import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.segment.CompleteSegment; -import org.apache.druid.segment.CursorFactory; -import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.QueryableIndexCursorFactory; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestIndex; -import org.apache.druid.segment.incremental.IncrementalIndex; -import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; -import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.SpecificSegmentsQuerySegmentWalker; import org.apache.druid.server.coordination.DataSegmentAnnouncer; import org.apache.druid.server.coordination.NoopDataSegmentAnnouncer; -import org.apache.druid.sql.calcite.CalciteNestedDataQueryTest; import org.apache.druid.sql.calcite.TempDirProducer; -import org.apache.druid.sql.calcite.util.CalciteTests; -import org.apache.druid.sql.calcite.util.TestDataBuilder; -import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; -import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; -import org.joda.time.Interval; import org.mockito.Mockito; -import javax.annotation.Nullable; import java.io.File; import java.util.List; import java.util.Set; -import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; -import static org.apache.druid.sql.calcite.util.CalciteTests.ARRAYS_DATASOURCE; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; -import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; -import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1_WITH_NUMERIC_DIMS; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS2; -import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS_LOTS_OF_COLUMNS; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; @@ -198,29 +155,23 @@ public DataSegmentProvider provideDataSegmentProvider(LocalDataSegmentProvider l } @LazySingleton - static class LocalDataSegmentProvider extends CacheLoader implements DataSegmentProvider + static class LocalDataSegmentProvider implements DataSegmentProvider { - private TempDirProducer tempDirProducer; - private LoadingCache cache; + private SpecificSegmentsQuerySegmentWalker walker; @Inject - public LocalDataSegmentProvider(TempDirProducer tempDirProducer) + public LocalDataSegmentProvider(SpecificSegmentsQuerySegmentWalker walker) { - this.tempDirProducer = tempDirProducer; - this.cache = CacheBuilder.newBuilder().build(this); + this.walker = walker; } @Override - public CompleteSegment load(SegmentId segmentId) throws Exception + public Supplier> fetchSegment( + SegmentId segmentId, + ChannelCounters channelCounters, + boolean isReindex) { - return getSupplierForSegment(tempDirProducer::newTempFolder, segmentId); - } - - @Override - public Supplier> fetchSegment(SegmentId segmentId, - ChannelCounters channelCounters, boolean isReindex) - { - CompleteSegment a = cache.getUnchecked(segmentId); + CompleteSegment a = walker.getSegment(segmentId); return () -> new ReferenceCountingResourceHolder<>(a, Closer.create()); } @@ -272,264 +223,4 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor doReturn(dataServerQueryHandler).when(mockFactory).createDataServerQueryHandler(anyString(), any(), any()); return mockFactory; } - - protected static CompleteSegment getSupplierForSegment( - Function tempFolderProducer, - SegmentId segmentId - ) - { - final QueryableIndex index; - switch (segmentId.getDataSource()) { - case WIKIPEDIA: - try { - final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID())); - final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex(); - TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null); - index = TestIndex.INDEX_IO.loadIndex(directory); - } - catch (Exception e) { - throw new RuntimeException(e); - } - break; - case DATASOURCE1: - IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt"), - new FloatSumAggregatorFactory("m1", "m1"), - new DoubleSumAggregatorFactory("m2", "m2"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - ) - .withRollup(false) - .build(); - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(foo1Schema) - .rows(ROWS1) - .buildMMappedIndex(); - break; - case DATASOURCE2: - final IncrementalIndexSchema indexSchemaDifferentDim3M1Types = new IncrementalIndexSchema.Builder() - .withDimensionsSpec( - new DimensionsSpec( - ImmutableList.of( - new StringDimensionSchema("dim1"), - new StringDimensionSchema("dim2"), - new LongDimensionSchema("dim3") - ) - ) - ) - .withMetrics( - new CountAggregatorFactory("cnt"), - new LongSumAggregatorFactory("m1", "m1"), - new DoubleSumAggregatorFactory("m2", "m2"), - new HyperUniquesAggregatorFactory("unique_dim1", "dim1") - ) - .withRollup(false) - .build(); - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(indexSchemaDifferentDim3M1Types) - .rows(ROWS2) - .buildMMappedIndex(); - break; - case DATASOURCE3: - case CalciteTests.BROADCAST_DATASOURCE: - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(INDEX_SCHEMA_NUMERIC_DIMS) - .rows(ROWS1_WITH_NUMERIC_DIMS) - .buildMMappedIndex(); - break; - case DATASOURCE5: - index = IndexBuilder - .create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema(INDEX_SCHEMA_LOTS_O_COLUMNS) - .rows(ROWS_LOTS_OF_COLUMNS) - .buildMMappedIndex(); - break; - case ARRAYS_DATASOURCE: - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) - .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .build() - ) - .inputSource( - ResourceInputSource.of( - NestedDataTestUtils.class.getClassLoader(), - NestedDataTestUtils.ARRAY_TYPES_DATA_FILE - ) - ) - .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) - .inputTmpDir(tempFolderProducer.apply("tmpDir")) - .buildMMappedIndex(); - break; - case CalciteNestedDataQueryTest.DATA_SOURCE: - case CalciteNestedDataQueryTest.DATA_SOURCE_MIXED: - if (segmentId.getPartitionNum() == 0) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.ALL_JSON_COLUMNS.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS) - .buildMMappedIndex(); - } else if (segmentId.getPartitionNum() == 1) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.JSON_AND_SCALAR_MIX.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS_MIX) - .buildMMappedIndex(); - } else { - throw new ISE("Cannot query segment %s in test runner", segmentId); - } - break; - case CalciteNestedDataQueryTest.DATA_SOURCE_MIXED_2: - if (segmentId.getPartitionNum() == 0) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.JSON_AND_SCALAR_MIX.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS_MIX) - .buildMMappedIndex(); - } else if (segmentId.getPartitionNum() == 1) { - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withDimensionsSpec(CalciteNestedDataQueryTest.ALL_JSON_COLUMNS.getDimensionsSpec()) - .withRollup(false) - .build() - ) - .rows(CalciteNestedDataQueryTest.ROWS) - .buildMMappedIndex(); - } else { - throw new ISE("Cannot query segment %s in test runner", segmentId); - } - break; - case CalciteNestedDataQueryTest.DATA_SOURCE_ALL: - index = IndexBuilder.create() - .tmpDir(tempFolderProducer.apply("tmpDir")) - .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) - .schema( - new IncrementalIndexSchema.Builder() - .withTimestampSpec(NestedDataTestUtils.AUTO_SCHEMA.getTimestampSpec()) - .withDimensionsSpec(NestedDataTestUtils.AUTO_SCHEMA.getDimensionsSpec()) - .withMetrics( - new CountAggregatorFactory("cnt") - ) - .withRollup(false) - .build() - ) - .inputSource( - ResourceInputSource.of( - NestedDataTestUtils.class.getClassLoader(), - NestedDataTestUtils.ALL_TYPES_TEST_DATA_FILE - ) - ) - .inputFormat(TestDataBuilder.DEFAULT_JSON_INPUT_FORMAT) - .inputTmpDir(tempFolderProducer.apply("tmpDir")) - .buildMMappedIndex(); - break; - case CalciteTests.WIKIPEDIA_FIRST_LAST: - index = TestDataBuilder.makeWikipediaIndexWithAggregation(tempFolderProducer.apply("tmpDir")); - break; - case CalciteTests.TBL_WITH_NULLS_PARQUET: - case CalciteTests.SML_TBL_PARQUET: - case CalciteTests.ALL_TYPES_UNIQ_PARQUET: - case CalciteTests.FEW_ROWS_ALL_DATA_PARQUET: - case CalciteTests.T_ALL_TYPE_PARQUET: - index = TestDataBuilder.getQueryableIndexForDrillDatasource(segmentId.getDataSource(), tempFolderProducer.apply("tmpDir")); - break; - case CalciteTests.BENCHMARK_DATASOURCE: - index = TestDataBuilder.getQueryableIndexForBenchmarkDatasource(); - break; - default: - throw new ISE("Cannot query segment %s in test runner", segmentId); - - } - Segment segment = new Segment() - { - @Override - public SegmentId getId() - { - return segmentId; - } - - @Override - public Interval getDataInterval() - { - return segmentId.getInterval(); - } - - @Nullable - @Override - public QueryableIndex asQueryableIndex() - { - return index; - } - - @Override - public CursorFactory asCursorFactory() - { - return new QueryableIndexCursorFactory(index); - } - - @Override - public void close() - { - } - }; - DataSegment dataSegment = DataSegment.builder() - .dataSource(segmentId.getDataSource()) - .interval(segmentId.getInterval()) - .version(segmentId.getVersion()) - .shardSpec(new LinearShardSpec(0)) - .size(0) - .build(); - return new CompleteSegment(dataSegment, segment); - } } diff --git a/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq new file mode 100644 index 000000000000..65dbce0f087b --- /dev/null +++ b/quidem-ut/src/test/quidem/org.apache.druid.quidem.QTest/dart_with_datasets.iq @@ -0,0 +1,13 @@ +!set dartQueryId 00000000-0000-0000-0000-000000000000 +!use druidtest://?componentSupplier=StandardMSQComponentSupplier&datasets=sql/src/test/quidem/sampledataset +!set outputformat mysql + +select count(1) from "rollup-tutorial"; ++--------+ +| EXPR$0 | ++--------+ +| 9 | ++--------+ +(1 row) + +!ok diff --git a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java index ff385e99f641..7cc21ecd083c 100644 --- a/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java +++ b/server/src/test/java/org/apache/druid/server/SpecificSegmentsQuerySegmentWalker.java @@ -23,6 +23,8 @@ import com.google.common.collect.Ordering; import com.google.common.io.Closeables; import com.google.inject.Injector; +import org.apache.curator.shaded.com.google.common.collect.Lists; +import org.apache.druid.error.DruidException; import org.apache.druid.query.DataSource; import org.apache.druid.query.FrameBasedInlineDataSource; import org.apache.druid.query.InlineDataSource; @@ -34,6 +36,7 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.query.lookup.LookupExtractorFactoryContainer; import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.FrameBasedInlineSegmentWrangler; import org.apache.druid.segment.IncrementalIndexSegment; import org.apache.druid.segment.InlineSegmentWrangler; @@ -51,6 +54,7 @@ import org.apache.druid.server.metrics.NoopServiceEmitter; import org.apache.druid.sql.calcite.util.datasets.TestDataSet; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; import org.apache.druid.timeline.VersionedIntervalTimeline; import org.apache.druid.timeline.partition.LinearShardSpec; import org.joda.time.Interval; @@ -76,8 +80,7 @@ public class SpecificSegmentsQuerySegmentWalker implements QuerySegmentWalker, C { private final QuerySegmentWalker walker; private final Map> timelines; - private final List closeables = new ArrayList<>(); - private final List segments = new ArrayList<>(); + private final List segments = new ArrayList<>(); private static final LookupExtractorFactoryContainerProvider LOOKUP_EXTRACTOR_FACTORY_CONTAINER_PROVIDER = new LookupExtractorFactoryContainerProvider() { @@ -185,6 +188,14 @@ public SpecificSegmentsQuerySegmentWalker( public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, final Segment segment) { + return add(new CompleteSegment(descriptor, segment)); + } + + public SpecificSegmentsQuerySegmentWalker add(CompleteSegment completeSegment) + { + DataSegment descriptor = completeSegment.getDataSegment(); + Segment segment = completeSegment.getSegment(); + final ReferenceCountingSegment referenceCountingSegment = ReferenceCountingSegment.wrapSegment( segment, @@ -199,8 +210,7 @@ public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, fina descriptor.getVersion(), descriptor.getShardSpec().createChunk(referenceCountingSegment) ); - segments.add(descriptor); - closeables.add(referenceCountingSegment); + segments.add(completeSegment); return this; } @@ -216,7 +226,7 @@ public SpecificSegmentsQuerySegmentWalker add(final DataSegment descriptor, fina public List getSegments() { - return segments; + return Lists.transform(segments, CompleteSegment::getDataSegment); } @Override @@ -234,7 +244,7 @@ public QueryRunner getQueryRunnerForSegments(final Query query, final @Override public void close() throws IOException { - for (Closeable closeable : closeables) { + for (Closeable closeable : segments) { Closeables.close(closeable, true); } } @@ -253,4 +263,23 @@ public SpecificSegmentsQuerySegmentWalker add(TestDataSet dataset, File tmpDir) indexNumericDims ); } + + public CompleteSegment getSegment(SegmentId segmentId) + { + List matches = new ArrayList<>(1); + for (CompleteSegment s : segments) { + SegmentId id = s.getDataSegment().getId(); + if (id.equals(segmentId)) { + matches.add(s); + } + } + if (matches.size() != 1) { + throw DruidException.defensive( + "SegmentId [%s] has unexpected number of matches! [%s]", + segmentId, + matches + ); + } + return matches.get(0); + } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java index d88af75df644..e99550d08c15 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/SqlTestFramework.java @@ -963,7 +963,6 @@ private SystemSchema makeSystemSchema(AuthorizerMapper authorizerMapper, DruidSc return CalciteTests.createMockSystemSchema(druidSchema, timelineServerView, authorizerMapper); } - @Provides @LazySingleton private TimelineServerView makeTimelineServerView(SpecificSegmentsQuerySegmentWalker walker)