Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void testTopNWithDistinctCountAgg() throws Exception
final Iterable<Result<TopNResultValue>> results =
engine.query(
query,
new IncrementalIndexSegment(index, SegmentId.dummy(QueryRunnerTestHelper.DATA_SOURCE)),
new IncrementalIndexSegment(index, SegmentId.simpleTable(QueryRunnerTestHelper.DATA_SOURCE)),
null
).toList();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@

import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.msq.input.external.ExternalSegment;
import org.apache.druid.msq.input.inline.InlineInputSliceReader;
import org.apache.druid.query.lookup.LookupSegment;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.timeline.SegmentId;

import javax.annotation.Nullable;

/**
* Utility class containing methods that help in generating the {@link org.apache.druid.sql.calcite.parser.ParseException}
Expand All @@ -39,22 +36,20 @@ public class ParseExceptionUtils
* Given a segment, this returns the human-readable description of the segment which can allow user to figure out the
* source of the parse exception
*/
@Nullable
public static String generateReadableInputSourceNameFromMappedSegment(Segment segment)
{
if (segment instanceof ExternalSegment) {
return StringUtils.format(
"external input source: %s",
((ExternalSegment) segment).externalInputSource().toString()
);
} else if (segment instanceof LookupSegment) {
return StringUtils.format("lookup input source: %s", segment.getId().getDataSource());
} else if (segment instanceof QueryableIndexSegment) {
return StringUtils.format("table input source: %s", segment.getId().getDataSource());
} else if (InlineInputSliceReader.SEGMENT_ID.equals(segment.getId().getDataSource())) {
return "inline input source";
}

return null;
SegmentId segmentId = segment.getId();
return StringUtils.format(
"%s input source: %s",
StringUtils.toLowerCase(segmentId.getDataSourceType().name()),
segmentId
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.SimpleRowIngestionMeters;
import org.apache.druid.timeline.SegmentId;

import java.io.File;
import java.io.IOException;
Expand All @@ -65,7 +64,6 @@
*/
public class ExternalInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__external";
private final File temporaryDirectory;

public ExternalInputSliceReader(final File temporaryDirectory)
Expand Down Expand Up @@ -153,7 +151,6 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
reader = inputSource.reader(schema, inputFormat, temporaryDirectory);
}

final SegmentId segmentId = SegmentId.dummy(SEGMENT_ID);
final Segment segment = new ExternalSegment(
inputSource,
reader,
Expand All @@ -165,7 +162,7 @@ private static Iterator<SegmentWithDescriptor> inputSourceSegmentIterator(
);
return new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
new RichSegmentDescriptor(segmentId.toDescriptor(), null)
new RichSegmentDescriptor(ExternalSegment.SEGMENT_ID.toDescriptor(), null)
);
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@
*/
public class ExternalSegment extends RowBasedSegment<InputRow>
{

public static final SegmentId SEGMENT_ID = SegmentId.simple(SegmentId.DataSourceType.EXTERNAL);
private final InputSource inputSource;
private final RowSignature signature;
public static final String SEGMENT_ID = "__external";

/**
* @param inputSource {@link InputSource} that the segment is a representation of
Expand All @@ -67,7 +66,7 @@ public ExternalSegment(
)
{
super(
SegmentId.dummy(SEGMENT_ID),
SEGMENT_ID,
new BaseSequence<>(
new BaseSequence.IteratorMaker<InputRow, CloseableIterator<InputRow>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.InlineSegmentWrangler;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.timeline.SegmentId;

import java.util.function.Consumer;

Expand All @@ -43,10 +42,6 @@
*/
public class InlineInputSliceReader implements InputSliceReader
{
public static final String SEGMENT_ID = "__inline";
private static final RichSegmentDescriptor DUMMY_SEGMENT_DESCRIPTOR
= new RichSegmentDescriptor(SegmentId.dummy(SEGMENT_ID).toDescriptor(), null);

private final SegmentWrangler segmentWrangler;

public InlineInputSliceReader(SegmentWrangler segmentWrangler)
Expand Down Expand Up @@ -76,7 +71,7 @@ public ReadableInputs attach(
segment -> ReadableInput.segment(
new SegmentWithDescriptor(
() -> ResourceHolder.fromCloseable(new CompleteSegment(null, segment)),
DUMMY_SEGMENT_DESCRIPTOR
new RichSegmentDescriptor(InlineSegmentWrangler.SEGMENT_ID.toDescriptor(), null)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.msq.input.table.RichSegmentDescriptor;
import org.apache.druid.msq.input.table.SegmentWithDescriptor;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.lookup.LookupSegment;
import org.apache.druid.segment.CompleteSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentWrangler;
Expand Down Expand Up @@ -71,6 +72,7 @@ public ReadableInputs attach(
)
{
final String lookupName = ((LookupInputSlice) slice).getLookupName();
final SegmentId lookupSegmentId = LookupSegment.buildSegmentId(lookupName);

return ReadableInputs.segments(
() -> Iterators.singletonIterator(
Expand Down Expand Up @@ -101,7 +103,7 @@ public ReadableInputs attach(

return ResourceHolder.fromCloseable(new CompleteSegment(null, segment));
},
new RichSegmentDescriptor(SegmentId.dummy(lookupName).toDescriptor(), null)
new RichSegmentDescriptor(lookupSegmentId.toDescriptor(), null)
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.apache.druid.segment.SegmentReference;
import org.apache.druid.segment.TimeBoundaryInspector;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -187,7 +186,7 @@ protected ReturnOrAwait<Unit> runWithInputChannel(

if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, SegmentId.dummy("x"));
final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, "x");
final SegmentReference mappedSegment = mapSegment(frameSegment);

final Sequence<ResultRow> rowSequence =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.druid.sql.calcite.run.SqlResults;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.timeline.SegmentId;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -151,7 +150,7 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOEx

private void exportFrame(final Frame frame)
{
final Segment segment = new FrameSegment(frame, frameReader, SegmentId.dummy("test"));
final Segment segment = new FrameSegment(frame, frameReader, "test");
try (final CursorHolder cursorHolder = segment.asCursorFactory().makeCursorHolder(CursorBuildSpec.FULL_SCAN)) {
final Cursor cursor = cursorHolder.asCursor();
if (cursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.utils.CloseableUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -313,7 +312,7 @@ protected ReturnOrAwait<Unit> runWithInputChannel(
if (cursor == null || cursor.isDone()) {
if (inputChannel.canRead()) {
final Frame frame = inputChannel.read();
final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, SegmentId.dummy("scan"));
final FrameSegment frameSegment = new FrameSegment(frame, inputFrameReader, "scan");

final Segment mappedSegment = mapSegment(frameSegment);
final CursorFactory cursorFactory = mappedSegment.asCursorFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@
import org.apache.druid.msq.test.MSQTestTaskActionClient;
import org.apache.druid.query.Druids;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinDataSource;
import org.apache.druid.query.LookupDataSource;
import org.apache.druid.query.OrderBy;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryDataSource;
Expand Down Expand Up @@ -388,6 +391,75 @@ public void testUnnestOnRestrictedPassedPolicyValidation() throws Exception
);
}

@Test
public void testInlineDataSourcePassedPolicyValidation() throws Exception
{
// Arrange
policyEnforcer = new RestrictAllTablesPolicyEnforcer(null);
RowSignature resultSignature = RowSignature.builder()
.add("EXPR$0", ColumnType.LONG)
.build();
fieldMapping = buildFieldMapping(resultSignature);
InlineDataSource inlineDataSource = InlineDataSource.fromIterable(
ImmutableList.of(new Object[]{2L}),
resultSignature
);
Query query = new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.dataSource(inlineDataSource)
.eternityInterval()
.columns(resultSignature.getColumnNames())
.columnTypes(resultSignature.getColumnTypes())
.build();
DruidQuery druidQueryMock = buildDruidQueryMock(query, resultSignature);
// Act
msqTaskQueryMaker = getMSQTaskQueryMaker();
QueryResponse<Object[]> response = msqTaskQueryMaker.runQuery(druidQueryMock);
// Assert
String taskId = (String) Iterables.getOnlyElement(response.getResults().toList())[0];
MSQTaskReportPayload payload = (MSQTaskReportPayload) fakeOverlordClient.taskReportAsMap(taskId)
.get()
.get(MSQTaskReport.REPORT_KEY)
.getPayload();
Assert.assertTrue(payload.getStatus().getStatus().isSuccess());
ImmutableList<Object[]> expectedResults = ImmutableList.of(new Object[]{2L});
assertResultsEquals("select 1 + 1", expectedResults, payload.getResults().getResults());
}

@Test
public void testLookupDataSourcePassedPolicyValidation() throws Exception
{
// Arrange
policyEnforcer = new RestrictAllTablesPolicyEnforcer(null);
final RowSignature resultSignature = RowSignature.builder().add("v", ColumnType.STRING).build();
fieldMapping = buildFieldMapping(resultSignature);
Query query = new Druids.ScanQueryBuilder().resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST)
.eternityInterval()
.dataSource(new LookupDataSource("lookyloo"))
.columns(resultSignature.getColumnNames())
.columnTypes(resultSignature.getColumnTypes())
.orderBy(ImmutableList.of(OrderBy.ascending("v")))
.build();
DruidQuery druidQueryMock = buildDruidQueryMock(query, resultSignature);
// Act
msqTaskQueryMaker = getMSQTaskQueryMaker();
QueryResponse<Object[]> response = msqTaskQueryMaker.runQuery(druidQueryMock);
// Assert
String taskId = (String) Iterables.getOnlyElement(response.getResults().toList())[0];
MSQTaskReportPayload payload = (MSQTaskReportPayload) fakeOverlordClient.taskReportAsMap(taskId)
.get()
.get(MSQTaskReport.REPORT_KEY)
.getPayload();
// Assert
Assert.assertTrue(payload.getStatus().getStatus().isSuccess());
ImmutableList<Object[]> expectedResults = ImmutableList.of(
new Object[]{"mysteryvalue"},
new Object[]{"x6"},
new Object[]{"xa"},
new Object[]{"xabc"}
);
assertResultsEquals("select v from lookyloo", expectedResults, payload.getResults().getResults());
}

@Test
public void testJoinFailWithPolicyValidationOnLeftChild() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ public class FrameSegment implements Segment
private final FrameReader frameReader;
private final SegmentId segmentId;

public FrameSegment(Frame frame, FrameReader frameReader, String version)
{
this.frame = frame;
this.frameReader = frameReader;
this.segmentId = SegmentId.simple(SegmentId.DataSourceType.FRAME).withVersion(version);
}

/**
* @deprecated use {@link #FrameSegment(Frame, FrameReader, String)} instead
*/
@Deprecated
public FrameSegment(Frame frame, FrameReader frameReader, SegmentId segmentId)
{
this.frame = frame;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,18 @@ public class LookupSegment extends RowBasedSegment<Map.Entry<String, String>>
.add(LookupColumnSelectorFactory.VALUE_COLUMN, ColumnType.STRING)
.build();

/**
* Builds a {@link SegmentId} for a lookup segment.
*/
public static SegmentId buildSegmentId(String version)
{
return SegmentId.simple(SegmentId.DataSourceType.LOOKUP).withVersion(version);
}

public LookupSegment(final String lookupName, final LookupExtractorFactory lookupExtractorFactory)
{
super(
SegmentId.dummy(lookupName),
buildSegmentId(lookupName),
Sequences.simple(() -> {
final LookupExtractor extractor = lookupExtractorFactory.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,12 +296,12 @@ public static SegmentAnalysis mergeAnalyses(
if (id1 != null && id2 != null) {
if (id2.getIntervalEnd().isAfter(id1.getIntervalEnd()) ||
(id2.getIntervalEnd().isEqual(id1.getIntervalEnd()) && id2.getPartitionNum() > id1.getPartitionNum())) {
mergedSegmentId = SegmentId.merged(dataSource, id2.getInterval(), id2.getPartitionNum());
mergedSegmentId = id2.withVersion(SegmentId.MERGED_VERSION);
final SegmentAnalysis tmp = arg1;
arg1 = arg2;
arg2 = tmp;
} else {
mergedSegmentId = SegmentId.merged(dataSource, id1.getInterval(), id1.getPartitionNum());
mergedSegmentId = id1.withVersion(SegmentId.MERGED_VERSION);
}
break;
}
Expand Down Expand Up @@ -426,7 +426,7 @@ public static SegmentAnalysis mergeAnalyses(
if (arg1.getId() != null && arg2.getId() != null && arg1.getId().equals(arg2.getId())) {
mergedId = arg1.getId();
} else {
mergedId = mergedSegmentId == null ? "merged" : mergedSegmentId.toString();
mergedId = mergedSegmentId == null ? SegmentId.MERGED_VERSION : mergedSegmentId.toString();
}

final Boolean rollup;
Expand Down
Loading
Loading