Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
Expand Down Expand Up @@ -70,7 +70,7 @@ public TaskDataSegmentProvider(
}

@Override
public Supplier<ResourceHolder<Segment>> fetchSegment(
public Supplier<ResourceHolder<CompleteSegment>> fetchSegment(
final SegmentId segmentId,
final ChannelCounters channelCounters,
final boolean isReindex
Expand All @@ -79,7 +79,7 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
// Returns Supplier<ResourceHolder> instead of ResourceHolder, so the Coordinator calls and segment downloads happen
// in processing threads, rather than the main thread. (They happen when fetchSegmentInternal is called.)
return () -> {
ResourceHolder<Segment> holder = null;
ResourceHolder<CompleteSegment> holder = null;

while (holder == null) {
holder = holders.computeIfAbsent(
Expand All @@ -99,7 +99,7 @@ public Supplier<ResourceHolder<Segment>> fetchSegment(
* Helper used by {@link #fetchSegment(SegmentId, ChannelCounters, boolean)}. Does the actual fetching of a segment, once it
* is determined that we definitely need to go out and get one.
*/
private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
private ReferenceCountingResourceHolder<CompleteSegment> fetchSegmentInternal(
final SegmentId segmentId,
final ChannelCounters channelCounters,
final boolean isReindex
Expand Down Expand Up @@ -133,7 +133,7 @@ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
final int numRows = index.getNumRows();
final long size = dataSegment.getSize();
closer.register(() -> channelCounters.addFile(numRows, size));
return new ReferenceCountingResourceHolder<>(segment, closer);
return new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), closer);
}
catch (IOException | SegmentLoadingException e) {
throw CloseableUtils.closeInCatch(
Expand All @@ -143,29 +143,29 @@ private ReferenceCountingResourceHolder<Segment> fetchSegmentInternal(
}
}

private static class SegmentHolder implements Supplier<ResourceHolder<Segment>>
private static class SegmentHolder implements Supplier<ResourceHolder<CompleteSegment>>
{
private final Supplier<ResourceHolder<Segment>> holderSupplier;
private final Supplier<ResourceHolder<CompleteSegment>> holderSupplier;
private final Closeable cleanupFn;

@GuardedBy("this")
private ReferenceCountingResourceHolder<Segment> holder;
private ReferenceCountingResourceHolder<CompleteSegment> holder;

@GuardedBy("this")
private boolean closing;

@GuardedBy("this")
private boolean closed;

public SegmentHolder(Supplier<ResourceHolder<Segment>> holderSupplier, Closeable cleanupFn)
public SegmentHolder(Supplier<ResourceHolder<CompleteSegment>> holderSupplier, Closeable cleanupFn)
{
this.holderSupplier = holderSupplier;
this.cleanupFn = cleanupFn;
}

@Override
@Nullable
public ResourceHolder<Segment> get()
public ResourceHolder<CompleteSegment> get()
{
synchronized (this) {
if (closing) {
Expand All @@ -183,7 +183,7 @@ public ResourceHolder<Segment> get()
// Then, return null so "fetchSegment" will try again.
return null;
} else if (holder == null) {
final ResourceHolder<Segment> segmentHolder = holderSupplier.get();
final ResourceHolder<CompleteSegment> segmentHolder = holderSupplier.get();
holder = new ReferenceCountingResourceHolder<>(
segmentHolder.get(),
() -> {
Expand All @@ -210,7 +210,7 @@ public ResourceHolder<Segment> get()
}
}
);
final ResourceHolder<Segment> retVal = holder.increment();
final ResourceHolder<CompleteSegment> retVal = holder.increment();
// Store already-closed holder, so it disappears when the last reference is closed.
holder.close();
return retVal;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.msq.util.DimensionSchemaUtils;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.RowBasedSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -163,7 +164,7 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
signature
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
new RichSegmentDescriptor(segmentId.toDescriptor(), null)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ReadableInputs attach(
segmentWrangler.getSegmentsForIntervals(dataSource, Intervals.ONLY_ETERNITY),
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(segment),
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
DUMMY_SEGMENT_DESCRIPTOR
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;
Expand Down Expand Up @@ -98,7 +99,7 @@ public ReadableInputs attach(
throw new ISE("Lookup[%s] has multiple segments; cannot read", lookupName);
}

return ResourceHolder.fromCloseable(segment);
return ResourceHolder.fromCloseable(new CompleteSegment(null, segment));
},
new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Preconditions;
import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Segment;

import java.util.Objects;
Expand All @@ -31,7 +32,7 @@
*/
public class SegmentWithDescriptor
{
private final Supplier<? extends ResourceHolder<Segment>> segmentSupplier;
private final Supplier<? extends ResourceHolder<CompleteSegment>> segmentSupplier;
private final RichSegmentDescriptor descriptor;

/**
Expand All @@ -42,7 +43,7 @@ public class SegmentWithDescriptor
* @param descriptor segment descriptor
*/
public SegmentWithDescriptor(
final Supplier<? extends ResourceHolder<Segment>> segmentSupplier,
final Supplier<? extends ResourceHolder<CompleteSegment>> segmentSupplier,
final RichSegmentDescriptor descriptor
)
{
Expand All @@ -59,7 +60,7 @@ public SegmentWithDescriptor(
* It is not necessary to call {@link Segment#close()} on the returned segment. Calling {@link ResourceHolder#close()}
* is enough.
*/
public ResourceHolder<Segment> getOrLoad()
public ResourceHolder<CompleteSegment> getOrLoad()
{
return segmentSupplier.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.druid.collections.ResourceHolder;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.timeline.SegmentId;

import java.util.function.Supplier;
Expand All @@ -35,7 +35,7 @@ public interface DataSegmentProvider
* <br>
* It is not necessary to call {@link ResourceHolder#close()} if you never call {@link Supplier#get()}.
*/
Supplier<ResourceHolder<Segment>> fetchSegment(
Supplier<ResourceHolder<CompleteSegment>> fetchSegment(
SegmentId segmentId,
ChannelCounters channelCounters,
boolean isReindex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.spec.SpecificSegmentSpec;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.column.RowSignature;
Expand Down Expand Up @@ -152,8 +152,8 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(DataServerQue
protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (resultYielder == null) {
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final SegmentReference mappedSegment = mapSegment(segmentHolder.get());
final ResourceHolder<CompleteSegment> segmentHolder = closer.register(segment.getOrLoad());
final SegmentReference mappedSegment = mapSegment(segmentHolder.get().getSegment());

final Sequence<ResultRow> rowSequence =
groupingEngine.process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.druid.query.scan.ScanQueryEngine;
import org.apache.druid.query.scan.ScanResultValue;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorHolder;
Expand Down Expand Up @@ -245,9 +246,9 @@ protected ReturnOrAwait<SegmentsInputSlice> runWithDataServerQuery(final DataSer
protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment) throws IOException
{
if (cursor == null) {
final ResourceHolder<Segment> segmentHolder = closer.register(segment.getOrLoad());
final ResourceHolder<CompleteSegment> segmentHolder = closer.register(segment.getOrLoad());

final Segment mappedSegment = mapSegment(segmentHolder.get());
final Segment mappedSegment = mapSegment(segmentHolder.get().getSegment());
final CursorFactory cursorFactory = mappedSegment.asCursorFactory();
if (cursorFactory == null) {
throw new ISE(
Expand All @@ -264,7 +265,7 @@ protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor segment
// No cursors!
return ReturnOrAwait.returnObject(Unit.instance());
} else {
final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get());
final long rowsFlushed = setNextCursor(nextCursor, segmentHolder.get().getSegment());
assert rowsFlushed == 0; // There's only ever one cursor when running with a segment
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.msq.counters.ChannelCounters;
import org.apache.druid.query.OrderBy;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.DimensionHandler;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -187,16 +187,16 @@ public void testConcurrency()
for (int i = 0; i < iterations; i++) {
final int expectedSegmentNumber = i % NUM_SEGMENTS;
final DataSegment segment = segments.get(expectedSegmentNumber);
final ListenableFuture<Supplier<ResourceHolder<Segment>>> f =
final ListenableFuture<Supplier<ResourceHolder<CompleteSegment>>> f =
exec.submit(() -> provider.fetchSegment(segment.getId(), new ChannelCounters(), false));

testFutures.add(
FutureUtils.transform(
f,
holderSupplier -> {
try {
final ResourceHolder<Segment> holder = holderSupplier.get();
Assert.assertEquals(segment.getId(), holder.get().getId());
final ResourceHolder<CompleteSegment> holder = holderSupplier.get();
Assert.assertEquals(segment.getId(), holder.get().getSegment().getId());

final String expectedStorageDir = DataSegmentPusher.getDefaultStorageDir(segment, false);
final File expectedFile = new File(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,14 @@
import org.apache.druid.query.groupby.GroupByQueryRunnerTest;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.groupby.TestGroupByBuffers;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.loading.DataSegmentPusher;
Expand All @@ -86,6 +85,7 @@
import org.apache.druid.sql.calcite.util.TestDataBuilder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Interval;
import org.mockito.Mockito;
Expand Down Expand Up @@ -161,11 +161,10 @@ public String getFormatString()
)
);
ObjectMapper testMapper = MSQTestBase.setupObjectMapper(dummyInjector);
IndexIO indexIO = new IndexIO(testMapper, ColumnConfig.DEFAULT);
SegmentCacheManager segmentCacheManager = new SegmentCacheManagerFactory(TestIndex.INDEX_IO, testMapper)
.manufacturate(cacheManagerDir);
LocalDataSegmentPusherConfig config = new LocalDataSegmentPusherConfig();
MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager, indexIO);
MSQTestSegmentManager segmentManager = new MSQTestSegmentManager(segmentCacheManager);
config.storageDirectory = storageDir;
binder.bind(DataSegmentPusher.class).toProvider(() -> new MSQTestDelegateDataSegmentPusher(
new LocalDataSegmentPusher(config),
Expand Down Expand Up @@ -206,7 +205,10 @@ private static DataServerQueryHandlerFactory getTestDataServerQueryHandlerFactor
return mockFactory;
}

private static Supplier<ResourceHolder<Segment>> getSupplierForSegment(Function<String, File> tempFolderProducer, SegmentId segmentId)
protected static Supplier<ResourceHolder<CompleteSegment>> getSupplierForSegment(
Function<String, File> tempFolderProducer,
SegmentId segmentId
)
{
final QueryableIndex index;
switch (segmentId.getDataSource()) {
Expand Down Expand Up @@ -450,6 +452,13 @@ public void close()
{
}
};
return () -> new ReferenceCountingResourceHolder<>(segment, Closer.create());
DataSegment dataSegment = DataSegment.builder()
.dataSource(segmentId.getDataSource())
.interval(segmentId.getInterval())
.version(segmentId.getVersion())
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();
return () -> new ReferenceCountingResourceHolder<>(new CompleteSegment(dataSegment, segment), Closer.create());
}
}
Loading