From 4572a100aa735b98f42de46f3cb7a060254751a0 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 9 Sep 2024 11:38:25 +0530 Subject: [PATCH 1/9] Store datasegment while fetching segment in MSQ --- .../msq/exec/TaskDataSegmentProvider.java | 24 +++--- .../external/ExternalInputSliceReader.java | 3 +- .../input/inline/InlineInputSliceReader.java | 3 +- .../input/lookup/LookupInputSliceReader.java | 3 +- .../input/table/SegmentWithDescriptor.java | 6 +- .../msq/input/table/SegmentWithMetadata.java | 74 +++++++++++++++++++ .../msq/querykit/DataSegmentProvider.java | 3 +- .../GroupByPreShuffleFrameProcessor.java | 5 +- .../scan/ScanQueryFrameProcessor.java | 7 +- .../msq/exec/TaskDataSegmentProviderTest.java | 8 +- .../druid/msq/test/CalciteMSQTestsHelper.java | 15 +++- .../apache/druid/msq/test/MSQTestBase.java | 15 +++- 12 files changed, 134 insertions(+), 32 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index c327ec340aea..2e992485fdc9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -28,11 +28,11 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.timeline.DataSegment; @@ -70,7 +70,7 @@ public TaskDataSegmentProvider( } @Override - public Supplier> fetchSegment( + public Supplier> fetchSegment( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -79,7 +79,7 @@ public Supplier> fetchSegment( // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen // in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.) return () -> { - ResourceHolder holder = null; + ResourceHolder holder = null; while (holder == null) { holder = holders.computeIfAbsent( @@ -99,7 +99,7 @@ public Supplier> fetchSegment( * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it * is determined that we definitely need to go out and get one. */ - private ReferenceCountingResourceHolder fetchSegmentInternal( + private ReferenceCountingResourceHolder fetchSegmentInternal( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -133,7 +133,7 @@ private ReferenceCountingResourceHolder fetchSegmentInternal( final int numRows = index.getNumRows(); final long size = dataSegment.getSize(); closer.register(() -> channelCounters.addFile(numRows, size)); - return new ReferenceCountingResourceHolder<>(segment, closer); + return new ReferenceCountingResourceHolder<>(new SegmentWithMetadata(dataSegment, segment), closer); } catch (IOException | SegmentLoadingException e) { throw CloseableUtils.closeInCatch( @@ -143,13 +143,13 @@ private ReferenceCountingResourceHolder fetchSegmentInternal( } } - private static class SegmentHolder implements Supplier> + private static class SegmentHolder implements Supplier> { - private final Supplier> holderSupplier; + private final Supplier> holderSupplier; private final Closeable cleanupFn; @GuardedBy("this") - private ReferenceCountingResourceHolder holder; + private ReferenceCountingResourceHolder holder; @GuardedBy("this") private boolean closing; @@ -157,7 +157,7 @@ private static class SegmentHolder implements Supplier> @GuardedBy("this") private boolean closed; - public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) + public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) { this.holderSupplier = holderSupplier; this.cleanupFn = cleanupFn; @@ -165,7 +165,7 @@ public SegmentHolder(Supplier> holderSupplier, Closeable @Override @Nullable - public ResourceHolder get() + public ResourceHolder get() { synchronized (this) { if (closing) { @@ -183,7 +183,7 @@ public ResourceHolder get() // Then, return null so "fetchSegment" will try again. return null; } else if (holder == null) { - final ResourceHolder segmentHolder = holderSupplier.get(); + final ResourceHolder segmentHolder = holderSupplier.get(); holder = new ReferenceCountingResourceHolder<>( segmentHolder.get(), () -> { @@ -210,7 +210,7 @@ public ResourceHolder get() } } ); - final ResourceHolder retVal = holder.increment(); + final ResourceHolder retVal = holder.increment(); // Store already-closed holder, so it disappears when the last reference is closed. holder.close(); return retVal; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 4b68a3bf1b01..678a88197891 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -44,6 +44,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.util.DimensionSchemaUtils; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.Segment; @@ -163,7 +164,7 @@ private static Iterator inputSourceSegmentIterator( signature ); return new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)), new RichSegmentDescriptor(segmentId.toDescriptor(), null) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java index ef58c7723b33..7c4ac4e7f1eb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java @@ -29,6 +29,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.InlineDataSource; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.SegmentWrangler; @@ -74,7 +75,7 @@ public ReadableInputs attach( segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY), segment -> ReadableInput.segment( new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(segment), + () -> ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)), DUMMY_SEGMENT_DESCRIPTOR ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java index 2b327f216f7c..2c5e996f7ce1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java @@ -31,6 +31,7 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.LookupDataSource; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; @@ -98,7 +99,7 @@ public ReadableInputs attach( throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); } - return ResourceHolder.fromCloseable(segment); + return ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)); }, new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java index b9026c7b9fb9..feb8dda82292 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java @@ -31,7 +31,7 @@ */ public class SegmentWithDescriptor { - private final Supplier> segmentSupplier; + private final Supplier> segmentSupplier; private final RichSegmentDescriptor descriptor; /** @@ -42,7 +42,7 @@ public class SegmentWithDescriptor * @param descriptor segment descriptor */ public SegmentWithDescriptor( - final Supplier> segmentSupplier, + final Supplier> segmentSupplier, final RichSegmentDescriptor descriptor ) { @@ -59,7 +59,7 @@ public SegmentWithDescriptor( * It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()} * is enough. */ - public ResourceHolder getOrLoad() + public ResourceHolder getOrLoad() { return segmentSupplier.get(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java new file mode 100644 index 000000000000..a4ccfe1a9272 --- /dev/null +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input.table; + +import org.apache.druid.segment.Segment; +import org.apache.druid.timeline.DataSegment; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Objects; + +public class SegmentWithMetadata implements Closeable +{ + private final DataSegment dataSegment; + private final Segment segment; + + public SegmentWithMetadata(DataSegment dataSegment, Segment segment) + { + this.dataSegment = dataSegment; + this.segment = segment; + } + + public DataSegment getDataSegment() + { + return dataSegment; + } + + public Segment getSegment() + { + return segment; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentWithMetadata that = (SegmentWithMetadata) o; + return Objects.equals(dataSegment, that.dataSegment) && Objects.equals(segment, that.segment); + } + + @Override + public int hashCode() + { + return Objects.hash(dataSegment, segment); + } + + @Override + public void close() throws IOException + { + segment.close(); + } +} diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 91ee4a487885..4ab0833da978 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -21,6 +21,7 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.segment.Segment; import org.apache.druid.timeline.SegmentId; @@ -35,7 +36,7 @@ public interface DataSegmentProvider *
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. */ - Supplier> fetchSegment( + Supplier> fetchSegment( SegmentId segmentId, ChannelCounters channelCounters, boolean isReindex diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index a859ea8cd534..dea7eb985c9a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -44,6 +44,7 @@ import org.apache.druid.msq.exec.DataServerQueryResult; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.table.SegmentWithDescriptor; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.querykit.BaseLeafFrameProcessor; import org.apache.druid.query.groupby.GroupByQuery; @@ -147,8 +148,8 @@ protected ReturnOrAwait runWithDataServerQuery(DataServerQue protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (resultYielder == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final SegmentReference mappedSegment = mapSegment(segmentHolder.get()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment()); final Sequence rowSequence = groupingEngine.process( diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index f402aa604308..d70a4f42789b 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -54,6 +54,7 @@ import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.external.ExternalSegment; import org.apache.druid.msq.input.table.SegmentWithDescriptor; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.querykit.BaseLeafFrameProcessor; import org.apache.druid.msq.querykit.QueryKitUtils; @@ -245,9 +246,9 @@ protected ReturnOrAwait runWithDataServerQuery(final DataSer protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); - final StorageAdapter adapter = mapSegment(segmentHolder.get()).asStorageAdapter(); + final StorageAdapter adapter = mapSegment(segmentHolder.get().getSegment()).asStorageAdapter(); if (adapter == null) { throw new ISE( "Null storage adapter found. Probably trying to issue a query against a segment being memory unmapped." @@ -263,7 +264,7 @@ protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment // No cursors! return ReturnOrAwait.returnObject(Unit.instance()); } else { - final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get()); + final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment()); assert rowsFlushed == 0; // There's only ever one cursor when running with a segment } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index b5141e12dc85..9bb1dfe40b38 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -42,12 +42,12 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.OrderBy; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnHolder; @@ -187,7 +187,7 @@ public void testConcurrency() for (int i = 0; i < iterations; i++) { final int expectedSegmentNumber = i % NUM_SEGMENTS; final DataSegment segment = segments.get(expectedSegmentNumber); - final ListenableFuture>> f = + final ListenableFuture>> f = exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false)); testFutures.add( @@ -195,8 +195,8 @@ public void testConcurrency() f, holderSupplier -> { try { - final ResourceHolder holder = holderSupplier.get(); - Assert.assertEquals(segment.getId(), holder.get().getId()); + final ResourceHolder holder = holderSupplier.get(); + Assert.assertEquals(segment.getId(), holder.get().getSegment().getId()); final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); final File expectedFile = new File( diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index eaa2a9efe5ae..d7a3e6c7f87f 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -47,6 +47,7 @@ import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; @@ -86,6 +87,7 @@ import org.apache.druid.sql.calcite.util.TestDataBuilder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.joda.time.Interval; import org.mockito.Mockito; @@ -206,7 +208,10 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor return mockFactory; } - private static Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected static Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { final QueryableIndex index; switch (segmentId.getDataSource()) { @@ -450,6 +455,12 @@ public void close() { } }; - return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create()); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .build(); + return () -> new ReferenceCountingResourceHolder<>(new SegmentWithMetadata(dataSegment, segment), Closer.create()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index bb8691b93225..0ac14902ac1c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -109,6 +109,7 @@ import org.apache.druid.msq.indexing.report.MSQSegmentReport; import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; +import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; @@ -198,6 +199,7 @@ import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.PruneLoadSpec; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.apache.druid.timeline.partition.TombstoneShardSpec; @@ -619,7 +621,10 @@ private DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory() } @Nonnull - private Supplier> getSupplierForSegment(Function tempFolderProducer, SegmentId segmentId) + protected Supplier> getSupplierForSegment( + Function tempFolderProducer, + SegmentId segmentId + ) { if (segmentManager.getSegment(segmentId) == null) { final QueryableIndex index; @@ -710,7 +715,13 @@ public void close() }; segmentManager.addSegment(segment); } - return () -> ReferenceCountingResourceHolder.fromCloseable(segmentManager.getSegment(segmentId)); + DataSegment dataSegment = DataSegment.builder() + .dataSource(segmentId.getDataSource()) + .interval(segmentId.getInterval()) + .version(segmentId.getVersion()) + .shardSpec(new LinearShardSpec(0)) + .build(); + return () -> ReferenceCountingResourceHolder.fromCloseable(new SegmentWithMetadata(dataSegment, segmentManager.getSegment(segmentId))); } public SelectTester testSelectQuery() From ccfa2fd59e9631c77953e8e4df0a8ebc184efdcf Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 9 Sep 2024 12:32:26 +0530 Subject: [PATCH 2/9] Add javadoc --- .../druid/msq/input/table/SegmentWithMetadata.java | 9 ++++++++- .../apache/druid/msq/querykit/DataSegmentProvider.java | 1 - .../groupby/GroupByPreShuffleFrameProcessor.java | 1 - 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java index a4ccfe1a9272..fd3ae58fb0f9 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java @@ -22,21 +22,28 @@ import org.apache.druid.segment.Segment; import org.apache.druid.timeline.DataSegment; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.Objects; +/** + * Contains the {@link DataSegment} and {@link Segment}. The datasegment could be null if the segment is a dummy, such + * as those created by {@link org.apache.druid.msq.input.inline.InlineInputSliceReader}. + */ public class SegmentWithMetadata implements Closeable { + @Nullable private final DataSegment dataSegment; private final Segment segment; - public SegmentWithMetadata(DataSegment dataSegment, Segment segment) + public SegmentWithMetadata(@Nullable DataSegment dataSegment, Segment segment) { this.dataSegment = dataSegment; this.segment = segment; } + @Nullable public DataSegment getDataSegment() { return dataSegment; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index 4ab0833da978..d2cb0cfae18e 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -22,7 +22,6 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; import org.apache.druid.msq.input.table.SegmentWithMetadata; -import org.apache.druid.segment.Segment; import org.apache.druid.timeline.SegmentId; import java.util.function.Supplier; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index dea7eb985c9a..6f410d7e3b01 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -54,7 +54,6 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.TimeBoundaryInspector; import org.apache.druid.segment.column.RowSignature; From c4bbb87ff313ec809c35b4ad8d29255ac613b99e Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 9 Sep 2024 15:40:28 +0530 Subject: [PATCH 3/9] Fix tests --- .../src/test/java/org/apache/druid/msq/test/MSQTestBase.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 0ac14902ac1c..28eb6fc8d7d2 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -720,6 +720,7 @@ public void close() .interval(segmentId.getInterval()) .version(segmentId.getVersion()) .shardSpec(new LinearShardSpec(0)) + .size(0) .build(); return () -> ReferenceCountingResourceHolder.fromCloseable(new SegmentWithMetadata(dataSegment, segmentManager.getSegment(segmentId))); } From 23f5601ebfd8af96b74e9ac76e9c5d4536f2e8d2 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 9 Sep 2024 18:59:41 +0530 Subject: [PATCH 4/9] Fix tests --- .../java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java | 1 + 1 file changed, 1 insertion(+) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index d7a3e6c7f87f..fcee8f6418c5 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -460,6 +460,7 @@ public void close() .interval(segmentId.getInterval()) .version(segmentId.getVersion()) .shardSpec(new LinearShardSpec(0)) + .size(0) .build(); return () -> new ReferenceCountingResourceHolder<>(new SegmentWithMetadata(dataSegment, segment), Closer.create()); } From 75a49beb8f96c649e078dd8658a8761dc881e1cd Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Mon, 9 Sep 2024 19:12:14 +0530 Subject: [PATCH 5/9] Refactor tests a bit --- .../druid/msq/test/CalciteMSQTestsHelper.java | 5 +---- .../apache/druid/msq/test/MSQTestBase.java | 8 ++++---- .../MSQTestDelegateDataSegmentPusher.java | 2 +- .../druid/msq/test/MSQTestSegmentManager.java | 19 ++++++++++--------- 4 files changed, 16 insertions(+), 18 deletions(-) diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index fcee8f6418c5..31cb363ffcfa 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -63,14 +63,12 @@ import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.segment.IndexBuilder; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; -import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; @@ -163,11 +161,10 @@ public String getFormatString() ) ); ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector); - IndexIO indexIO = new IndexIO(testMapper, ColumnConfig.DEFAULT); SegmentCacheManager segmentCacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper) .manufacturate(cacheManagerDir); LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig(); - MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager); config.storageDirectory = storageDir; binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher( new LocalDataSegmentPusher(config), diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 28eb6fc8d7d2..2d5f9def61db 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -423,7 +423,7 @@ public void setUp2() throws Exception MSQSqlModule sqlModule = new MSQSqlModule(); - segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO); + segmentManager = new MSQTestSegmentManager(segmentCacheManager); BrokerClient brokerClient = mock(BrokerClient.class); List modules = ImmutableList.of( @@ -1211,17 +1211,17 @@ public void verifyResults() verifyLookupLoadingInfoInTaskContext(msqControllerTask.getContext()); log.info( "found generated segments: %s", - segmentManager.getAllDataSegments().stream().map(s -> s.toString()).collect( + segmentManager.getAllTestGeneratedDataSegments().stream().map(s -> s.toString()).collect( Collectors.joining("\n")) ); // check if segments are created if (!expectedResultRows.isEmpty()) { - Assert.assertNotEquals(0, segmentManager.getAllDataSegments().size()); + Assert.assertNotEquals(0, segmentManager.getAllTestGeneratedDataSegments().size()); } String foundDataSource = null; SortedMap>> segmentIdVsOutputRowsMap = new TreeMap<>(); - for (DataSegment dataSegment : segmentManager.getAllDataSegments()) { + for (DataSegment dataSegment : segmentManager.getAllTestGeneratedDataSegments()) { //Assert shard spec class Assert.assertEquals(expectedShardSpec, dataSegment.getShardSpec().getClass()); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java index 73fca53682c6..5d22c08e6022 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestDelegateDataSegmentPusher.java @@ -60,7 +60,7 @@ public String getPathForHadoop() public DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException { final DataSegment dataSegment = delegate.push(file, segment, useUniquePath); - segmentManager.addDataSegment(dataSegment); + segmentManager.addTestGeneratedDataSegment(dataSegment); return dataSegment; } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java index 6151cb37cc2a..2c0fdac3a69c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestSegmentManager.java @@ -20,7 +20,6 @@ package org.apache.druid.msq.test; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Segment; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -37,24 +36,26 @@ */ public class MSQTestSegmentManager { - private final ConcurrentMap dataSegments = new ConcurrentHashMap<>(); + private final ConcurrentMap testGeneratedDataSegments = new ConcurrentHashMap<>(); private final ConcurrentMap segments = new ConcurrentHashMap<>(); private final SegmentCacheManager segmentCacheManager; - private final IndexIO indexIO; final Object lock = new Object(); - public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager, IndexIO indexIO) + public MSQTestSegmentManager(SegmentCacheManager segmentCacheManager) { this.segmentCacheManager = segmentCacheManager; - this.indexIO = indexIO; } - public void addDataSegment(DataSegment dataSegment) + /** + * Registers a data segment which was generated during the test run (as opposed to during setup). This is used to + * validate which segments are generated by the test. + */ + public void addTestGeneratedDataSegment(DataSegment dataSegment) { synchronized (lock) { - dataSegments.put(dataSegment.getId(), dataSegment); + testGeneratedDataSegments.put(dataSegment.getId(), dataSegment); try { segmentCacheManager.getSegmentFiles(dataSegment); @@ -65,9 +66,9 @@ public void addDataSegment(DataSegment dataSegment) } } - public Collection getAllDataSegments() + public Collection getAllTestGeneratedDataSegments() { - return dataSegments.values(); + return testGeneratedDataSegments.values(); } public void addSegment(Segment segment) From 59c1336acc1fcd91c42acd35f6c2e2db053d2cc6 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 10 Sep 2024 09:18:42 +0530 Subject: [PATCH 6/9] Add test --- .../input/table/SegmentWithMetadataTest.java | 35 +++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java new file mode 100644 index 000000000000..cda0326ea660 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.input.table; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class SegmentWithMetadataTest +{ + @Test + public void testEquals() + { + EqualsVerifier.forClass(SegmentWithMetadata.class) + .withNonnullFields("segment", "dataSegment") + .usingGetClass() + .verify(); + } +} From 4b0b378e24874aaf1c95206496fa339d04813807 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 17 Sep 2024 12:49:27 +0530 Subject: [PATCH 7/9] Rename class --- .../msq/exec/TaskDataSegmentProvider.java | 24 +++++++++---------- .../external/ExternalInputSliceReader.java | 4 ++-- .../input/inline/InlineInputSliceReader.java | 4 ++-- .../input/lookup/LookupInputSliceReader.java | 4 ++-- .../input/table/SegmentWithDescriptor.java | 7 +++--- .../msq/querykit/DataSegmentProvider.java | 4 ++-- .../GroupByPreShuffleFrameProcessor.java | 4 ++-- .../scan/ScanQueryFrameProcessor.java | 4 ++-- .../msq/exec/TaskDataSegmentProviderTest.java | 6 ++--- .../druid/msq/test/CalciteMSQTestsHelper.java | 6 ++--- .../apache/druid/msq/test/MSQTestBase.java | 6 ++--- .../apache/druid/segment/CompleteSegment.java | 9 ++++--- .../druid/segment/CompleteSegmentTest.java | 6 ++--- 13 files changed, 44 insertions(+), 44 deletions(-) rename extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java => processing/src/main/java/org/apache/druid/segment/CompleteSegment.java (87%) rename extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java => processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java (88%) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java index 2e992485fdc9..04b08f9346af 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/TaskDataSegmentProvider.java @@ -28,8 +28,8 @@ import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.querykit.DataSegmentProvider; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -70,7 +70,7 @@ public TaskDataSegmentProvider( } @Override - public Supplier> fetchSegment( + public Supplier> fetchSegment( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -79,7 +79,7 @@ public Supplier> fetchSegment( // Returns Supplier instead of ResourceHolder, so the Coordinator calls and segment downloads happen // in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.) return () -> { - ResourceHolder holder = null; + ResourceHolder holder = null; while (holder == null) { holder = holders.computeIfAbsent( @@ -99,7 +99,7 @@ public Supplier> fetchSegment( * Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it * is determined that we definitely need to go out and get one. */ - private ReferenceCountingResourceHolder fetchSegmentInternal( + private ReferenceCountingResourceHolder fetchSegmentInternal( final SegmentId segmentId, final ChannelCounters channelCounters, final boolean isReindex @@ -133,7 +133,7 @@ private ReferenceCountingResourceHolder fetchSegmentInterna final int numRows = index.getNumRows(); final long size = dataSegment.getSize(); closer.register(() -> channelCounters.addFile(numRows, size)); - return new ReferenceCountingResourceHolder<>(new SegmentWithMetadata(dataSegment, segment), closer); + return new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), closer); } catch (IOException | SegmentLoadingException e) { throw CloseableUtils.closeInCatch( @@ -143,13 +143,13 @@ private ReferenceCountingResourceHolder fetchSegmentInterna } } - private static class SegmentHolder implements Supplier> + private static class SegmentHolder implements Supplier> { - private final Supplier> holderSupplier; + private final Supplier> holderSupplier; private final Closeable cleanupFn; @GuardedBy("this") - private ReferenceCountingResourceHolder holder; + private ReferenceCountingResourceHolder holder; @GuardedBy("this") private boolean closing; @@ -157,7 +157,7 @@ private static class SegmentHolder implements Supplier> holderSupplier, Closeable cleanupFn) + public SegmentHolder(Supplier> holderSupplier, Closeable cleanupFn) { this.holderSupplier = holderSupplier; this.cleanupFn = cleanupFn; @@ -165,7 +165,7 @@ public SegmentHolder(Supplier> holderSupplie @Override @Nullable - public ResourceHolder get() + public ResourceHolder get() { synchronized (this) { if (closing) { @@ -183,7 +183,7 @@ public ResourceHolder get() // Then, return null so "fetchSegment" will try again. return null; } else if (holder == null) { - final ResourceHolder segmentHolder = holderSupplier.get(); + final ResourceHolder segmentHolder = holderSupplier.get(); holder = new ReferenceCountingResourceHolder<>( segmentHolder.get(), () -> { @@ -210,7 +210,7 @@ public ResourceHolder get() } } ); - final ResourceHolder retVal = holder.increment(); + final ResourceHolder retVal = holder.increment(); // Store already-closed holder, so it disappears when the last reference is closed. holder.close(); return retVal; diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java index 678a88197891..2a863fa55257 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/external/ExternalInputSliceReader.java @@ -44,8 +44,8 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.util.DimensionSchemaUtils; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.RowBasedSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.column.ColumnHolder; @@ -164,7 +164,7 @@ private static Iterator inputSourceSegmentIterator( signature ); return new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), new RichSegmentDescriptor(segmentId.toDescriptor(), null) ); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java index 7c4ac4e7f1eb..8a05ce1527e4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/inline/InlineInputSliceReader.java @@ -29,8 +29,8 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.InlineDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.InlineSegmentWrangler; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -75,7 +75,7 @@ public ReadableInputs attach( segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY), segment -> ReadableInput.segment( new SegmentWithDescriptor( - () -> ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)), + () -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)), DUMMY_SEGMENT_DESCRIPTOR ) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java index 2c5e996f7ce1..85f0b10718db 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/lookup/LookupInputSliceReader.java @@ -31,8 +31,8 @@ import org.apache.druid.msq.input.ReadableInputs; import org.apache.druid.msq.input.table.RichSegmentDescriptor; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.LookupDataSource; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import org.apache.druid.segment.SegmentWrangler; import org.apache.druid.timeline.SegmentId; @@ -99,7 +99,7 @@ public ReadableInputs attach( throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName); } - return ResourceHolder.fromCloseable(new SegmentWithMetadata(null, segment)); + return ResourceHolder.fromCloseable(new CompleteSegment(null, segment)); }, new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null) ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java index feb8dda82292..343f7994d1e4 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithDescriptor.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Segment; import java.util.Objects; @@ -31,7 +32,7 @@ */ public class SegmentWithDescriptor { - private final Supplier> segmentSupplier; + private final Supplier> segmentSupplier; private final RichSegmentDescriptor descriptor; /** @@ -42,7 +43,7 @@ public class SegmentWithDescriptor * @param descriptor segment descriptor */ public SegmentWithDescriptor( - final Supplier> segmentSupplier, + final Supplier> segmentSupplier, final RichSegmentDescriptor descriptor ) { @@ -59,7 +60,7 @@ public SegmentWithDescriptor( * It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()} * is enough. */ - public ResourceHolder getOrLoad() + public ResourceHolder getOrLoad() { return segmentSupplier.get(); } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java index d2cb0cfae18e..232e85166a0f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSegmentProvider.java @@ -21,7 +21,7 @@ import org.apache.druid.collections.ResourceHolder; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.input.table.SegmentWithMetadata; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.timeline.SegmentId; import java.util.function.Supplier; @@ -35,7 +35,7 @@ public interface DataSegmentProvider *
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}. */ - Supplier> fetchSegment( + Supplier> fetchSegment( SegmentId segmentId, ChannelCounters channelCounters, boolean isReindex diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java index 3e4d9b42e22b..ad456a45f5b1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByPreShuffleFrameProcessor.java @@ -45,7 +45,6 @@ import org.apache.druid.msq.exec.DataServerQueryResult; import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.querykit.BaseLeafFrameProcessor; import org.apache.druid.query.groupby.GroupByQuery; @@ -55,6 +54,7 @@ import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.SegmentReference; import org.apache.druid.segment.TimeBoundaryInspector; import org.apache.druid.segment.column.RowSignature; @@ -152,7 +152,7 @@ protected ReturnOrAwait runWithDataServerQuery(DataServerQue protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (resultYielder == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment()); final Sequence rowSequence = diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 0b44e12a62ce..e5fa0a03d621 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -54,7 +54,6 @@ import org.apache.druid.msq.input.ReadableInput; import org.apache.druid.msq.input.external.ExternalSegment; import org.apache.druid.msq.input.table.SegmentWithDescriptor; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.input.table.SegmentsInputSlice; import org.apache.druid.msq.querykit.BaseLeafFrameProcessor; import org.apache.druid.msq.querykit.QueryKitUtils; @@ -65,6 +64,7 @@ import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.Cursor; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.CursorHolder; @@ -246,7 +246,7 @@ protected ReturnOrAwait runWithDataServerQuery(final DataSer protected ReturnOrAwait runWithSegment(final SegmentWithDescriptor segment) throws IOException { if (cursor == null) { - final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); + final ResourceHolder segmentHolder = closer.register(segment.getOrLoad()); final Segment mappedSegment = mapSegment(segmentHolder.get().getSegment()); final CursorFactory cursorFactory = mappedSegment.asCursorFactory(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java index 9bb1dfe40b38..a9bf5e91bb49 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/TaskDataSegmentProviderTest.java @@ -42,8 +42,8 @@ import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.msq.counters.ChannelCounters; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.query.OrderBy; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.DimensionHandler; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.Metadata; @@ -187,7 +187,7 @@ public void testConcurrency() for (int i = 0; i < iterations; i++) { final int expectedSegmentNumber = i % NUM_SEGMENTS; final DataSegment segment = segments.get(expectedSegmentNumber); - final ListenableFuture>> f = + final ListenableFuture>> f = exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false)); testFutures.add( @@ -195,7 +195,7 @@ public void testConcurrency() f, holderSupplier -> { try { - final ResourceHolder holder = holderSupplier.get(); + final ResourceHolder holder = holderSupplier.get(); Assert.assertEquals(segment.getId(), holder.get().getSegment().getId()); final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 4e7977424065..7d65cea9872c 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -47,7 +47,6 @@ import org.apache.druid.msq.exec.DataServerQueryHandlerFactory; import org.apache.druid.msq.guice.MSQExternalDataSourceModule; import org.apache.druid.msq.guice.MSQIndexingModule; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.query.DruidProcessingConfig; import org.apache.druid.query.ForwardingQueryProcessingPool; @@ -62,6 +61,7 @@ import org.apache.druid.query.groupby.GroupByQueryRunnerTest; import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexSpec; @@ -205,7 +205,7 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor return mockFactory; } - protected static Supplier> getSupplierForSegment( + protected static Supplier> getSupplierForSegment( Function tempFolderProducer, SegmentId segmentId ) @@ -459,6 +459,6 @@ public void close() .shardSpec(new LinearShardSpec(0)) .size(0) .build(); - return () -> new ReferenceCountingResourceHolder<>(new SegmentWithMetadata(dataSegment, segment), Closer.create()); + return () -> new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), Closer.create()); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java index 0cf7567e91f1..f0ef697cc380 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java @@ -109,7 +109,6 @@ import org.apache.druid.msq.indexing.report.MSQSegmentReport; import org.apache.druid.msq.indexing.report.MSQTaskReport; import org.apache.druid.msq.indexing.report.MSQTaskReportPayload; -import org.apache.druid.msq.input.table.SegmentWithMetadata; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.querykit.DataSegmentProvider; import org.apache.druid.msq.shuffle.input.DurableStorageInputChannelFactory; @@ -135,6 +134,7 @@ import org.apache.druid.query.groupby.GroupingEngine; import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.rpc.ServiceClientFactory; +import org.apache.druid.segment.CompleteSegment; import org.apache.druid.segment.CursorFactory; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; @@ -635,7 +635,7 @@ private DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactory() } @Nonnull - protected Supplier> getSupplierForSegment( + protected Supplier> getSupplierForSegment( Function tempFolderProducer, SegmentId segmentId ) @@ -736,7 +736,7 @@ public void close() .shardSpec(new LinearShardSpec(0)) .size(0) .build(); - return () -> ReferenceCountingResourceHolder.fromCloseable(new SegmentWithMetadata(dataSegment, segmentManager.getSegment(segmentId))); + return () -> ReferenceCountingResourceHolder.fromCloseable(new CompleteSegment(dataSegment, segmentManager.getSegment(segmentId))); } public SelectTester testSelectQuery() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java similarity index 87% rename from extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java rename to processing/src/main/java/org/apache/druid/segment/CompleteSegment.java index fd3ae58fb0f9..fd021d03bcfe 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/SegmentWithMetadata.java +++ b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java @@ -17,9 +17,8 @@ * under the License. */ -package org.apache.druid.msq.input.table; +package org.apache.druid.segment; -import org.apache.druid.segment.Segment; import org.apache.druid.timeline.DataSegment; import javax.annotation.Nullable; @@ -31,13 +30,13 @@ * Contains the {@link DataSegment} and {@link Segment}. The datasegment could be null if the segment is a dummy, such * as those created by {@link org.apache.druid.msq.input.inline.InlineInputSliceReader}. */ -public class SegmentWithMetadata implements Closeable +public class CompleteSegment implements Closeable { @Nullable private final DataSegment dataSegment; private final Segment segment; - public SegmentWithMetadata(@Nullable DataSegment dataSegment, Segment segment) + public CompleteSegment(@Nullable DataSegment dataSegment, Segment segment) { this.dataSegment = dataSegment; this.segment = segment; @@ -63,7 +62,7 @@ public boolean equals(Object o) if (o == null || getClass() != o.getClass()) { return false; } - SegmentWithMetadata that = (SegmentWithMetadata) o; + CompleteSegment that = (CompleteSegment) o; return Objects.equals(dataSegment, that.dataSegment) && Objects.equals(segment, that.segment); } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java similarity index 88% rename from extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java rename to processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java index cda0326ea660..09c0ce2c1fd7 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/input/table/SegmentWithMetadataTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java @@ -17,17 +17,17 @@ * under the License. */ -package org.apache.druid.msq.input.table; +package org.apache.druid.segment; import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.Test; -public class SegmentWithMetadataTest +public class CompleteSegmentTest { @Test public void testEquals() { - EqualsVerifier.forClass(SegmentWithMetadata.class) + EqualsVerifier.forClass(CompleteSegment.class) .withNonnullFields("segment", "dataSegment") .usingGetClass() .verify(); From 2a2e36aa1c61d68f26b8d04fb5cdddb00febfa69 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Tue, 17 Sep 2024 14:52:19 +0530 Subject: [PATCH 8/9] Fix checkstyle --- .../src/main/java/org/apache/druid/segment/CompleteSegment.java | 1 + 1 file changed, 1 insertion(+) diff --git a/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java index fd021d03bcfe..d44781774ae0 100644 --- a/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java +++ b/processing/src/main/java/org/apache/druid/segment/CompleteSegment.java @@ -43,6 +43,7 @@ public CompleteSegment(@Nullable DataSegment dataSegment, Segment segment) } @Nullable + @SuppressWarnings("unused") public DataSegment getDataSegment() { return dataSegment; From 0b997b1b5a5e8f70a13d44b39c11ca9d322e16a8 Mon Sep 17 00:00:00 2001 From: Adarsh Sanjeev Date: Wed, 18 Sep 2024 09:02:53 +0530 Subject: [PATCH 9/9] Add test --- .../apache/druid/segment/CompleteSegmentTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java index 09c0ce2c1fd7..2c256c1d2adf 100644 --- a/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CompleteSegmentTest.java @@ -22,8 +22,22 @@ import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.Test; +import java.io.IOException; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + public class CompleteSegmentTest { + @Test + public void testCloseSegment() throws IOException + { + Segment segment = mock(Segment.class); + CompleteSegment completeSegment = new CompleteSegment(null, segment); + completeSegment.close(); + verify(segment).close(); + } + @Test public void testEquals() {