Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ public static SerializeResult toBytes(
writeString(k, out);

try (Aggregator agg = aggFactory.factorize(
IncrementalIndex.makeColumnSelectorFactory(RowSignature::empty, VirtualColumns.EMPTY, aggFactory, supplier, true)
IncrementalIndex.makeColumnSelectorFactory(RowSignature.empty(), VirtualColumns.EMPTY, aggFactory, supplier, true)
)) {
try {
agg.aggregate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ public ColumnCapabilities getColumnCapabilities(String column)
return RowBasedColumnSelectorFactory.create(
adapter,
supplier::get,
() -> decoratedSignature,
decoratedSignature,
false
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private Result<TimeseriesResultValue> getNullTimeseriesResultValue(TimeseriesQue
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(null, null),
() -> aggregatorsSignature,
aggregatorsSignature,
false
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
@Nullable
private final LongSupplier rowIdSupplier;
private final RowAdapter<T> adapter;
private final Supplier<ColumnInspector> columnInspectorSupplier;
private final ColumnInspector columnInspector;
private final boolean throwParseExceptions;

/**
Expand All @@ -67,15 +67,15 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
final Supplier<T> rowSupplier,
@Nullable final LongSupplier rowIdSupplier,
final RowAdapter<T> adapter,
final Supplier<ColumnInspector> columnInspectorSupplier,
final ColumnInspector columnInspector,
final boolean throwParseExceptions
)
{
this.rowSupplier = rowSupplier;
this.rowIdSupplier = rowIdSupplier;
this.adapter = adapter;
this.columnInspectorSupplier =
Preconditions.checkNotNull(columnInspectorSupplier, "columnInspectorSupplier must be nonnull");
this.columnInspector =
Preconditions.checkNotNull(columnInspector, "columnInspector must be nonnull");
this.throwParseExceptions = throwParseExceptions;
}

Expand All @@ -84,7 +84,7 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
*
* @param adapter adapter for these row objects
* @param supplier supplier of row objects
* @param columnInspectorSupplier will be used for reporting available columns and their capabilities. Note that this
* @param columnInspector will be used for reporting available columns and their capabilities. Note that this
* factory will still allow creation of selectors on any named field in the rows, even if
* it doesn't appear in "columnInspector". (It only needs to be accessible via
* {@link RowAdapter#columnFunction}.) As a result, you can achieve an untyped mode by
Expand All @@ -95,11 +95,11 @@ public class RowBasedColumnSelectorFactory<T> implements ColumnSelectorFactory
public static <RowType> RowBasedColumnSelectorFactory<RowType> create(
final RowAdapter<RowType> adapter,
final Supplier<RowType> supplier,
final Supplier<ColumnInspector> columnInspectorSupplier,
final ColumnInspector columnInspector,
final boolean throwParseExceptions
)
{
return new RowBasedColumnSelectorFactory<>(supplier, null, adapter, columnInspectorSupplier, throwParseExceptions);
return new RowBasedColumnSelectorFactory<>(supplier, null, adapter, columnInspector, throwParseExceptions);
}

@Nullable
Expand Down Expand Up @@ -495,6 +495,6 @@ private Number getCurrentValueAsNumber()
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
return getColumnCapabilities(columnInspectorSupplier.get(), columnName);
return getColumnCapabilities(columnInspector, columnName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class RowBasedCursor<RowType> implements Cursor
rowWalker::currentRow,
() -> rowId,
rowAdapter,
() -> rowSignature,
rowSignature,
false
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.AbstractIndex;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.DimensionHandler;
Expand All @@ -68,7 +69,6 @@
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.serde.ComplexMetricExtractor;
import org.apache.druid.segment.serde.ComplexMetricSerde;
Expand Down Expand Up @@ -97,7 +97,7 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

public abstract class IncrementalIndex extends AbstractIndex implements Iterable<Row>, Closeable
public abstract class IncrementalIndex extends AbstractIndex implements Iterable<Row>, Closeable, ColumnInspector
{
/**
* Column selector used at ingestion time for inputs to aggregators.
Expand All @@ -109,7 +109,7 @@ public abstract class IncrementalIndex extends AbstractIndex implements Iterable
* @return column selector factory
*/
public static ColumnSelectorFactory makeColumnSelectorFactory(
final Supplier<RowSignature> rowSignatureSupplier,
final ColumnInspector columnInspector,
final VirtualColumns virtualColumns,
final AggregatorFactory agg,
final Supplier<InputRow> in,
Expand All @@ -119,7 +119,7 @@ public static ColumnSelectorFactory makeColumnSelectorFactory(
final RowBasedColumnSelectorFactory<InputRow> baseSelectorFactory = RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
in::get,
rowSignatureSupplier::get,
columnInspector,
true
);

Expand Down Expand Up @@ -437,10 +437,27 @@ public Map<String, ColumnCapabilities> getColumnCapabilities()
ImmutableMap.Builder<String, ColumnCapabilities> builder =
ImmutableMap.<String, ColumnCapabilities>builder().putAll(timeAndMetricsColumnCapabilities);

dimensionDescs.forEach((dimension, desc) -> builder.put(dimension, desc.getCapabilities()));
synchronized (dimensionDescs) {
dimensionDescs.forEach((dimension, desc) -> builder.put(dimension, desc.getCapabilities()));
}
return builder.build();
}

@Nullable
@Override
public ColumnCapabilities getColumnCapabilities(String columnName)
{
if (timeAndMetricsColumnCapabilities.containsKey(columnName)) {
return timeAndMetricsColumnCapabilities.get(columnName);
}
synchronized (dimensionDescs) {
if (dimensionDescs.containsKey(columnName)) {
return dimensionDescs.get(columnName).getCapabilities();
}
}
return null;
}

/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
Expand Down Expand Up @@ -847,15 +864,6 @@ public StorageAdapter toStorageAdapter()
return new IncrementalIndexStorageAdapter(this);
}

@Nullable
public ColumnCapabilities getCapabilities(String column)
{
if (dimensionDescs.containsKey(column)) {
return dimensionDescs.get(column).getCapabilities();
}
return timeAndMetricsColumnCapabilities.get(column);
}

public Metadata getMetadata()
{
return metadata;
Expand Down Expand Up @@ -987,15 +995,7 @@ protected ColumnSelectorFactory makeColumnSelectorFactory(
final boolean deserializeComplexMetrics
)
{
Supplier<RowSignature> signatureSupplier = () -> {
Map<String, ColumnCapabilities> capabilitiesMap = getColumnCapabilities();
RowSignature.Builder bob = RowSignature.builder();
for (Map.Entry<String, ColumnCapabilities> capabilitiesEntry : capabilitiesMap.entrySet()) {
bob.add(capabilitiesEntry.getKey(), capabilitiesEntry.getValue().toColumnType());
}
return bob.build();
};
return makeColumnSelectorFactory(signatureSupplier, virtualColumns, agg, in, deserializeComplexMetrics);
return makeColumnSelectorFactory(this, virtualColumns, agg, in, deserializeComplexMetrics);
}

protected final Comparator<IncrementalIndexRow> dimsComparator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public String getMetricType(String metric)
@Override
public ColumnCapabilities getCapabilities(String column)
{
return index.getCapabilities(column);
return index.getColumnCapabilities(column);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public Comparable getMaxValue(String column)
@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
// Different from index.getCapabilities because, in a way, IncrementalIndex's string-typed dimensions
// Different from index.getColumnCapabilities because, in a way, IncrementalIndex's string-typed dimensions
// are always potentially multi-valued at query time. (Missing / null values for a row can potentially be
// represented by an empty array; see StringDimensionIndexer.IndexerDimensionSelector's getRow method.)
//
Expand All @@ -222,7 +222,10 @@ public ColumnCapabilities getColumnCapabilities(String column)
// to the StringDimensionIndexer so the selector built on top of it can produce values from the snapshot state of
// multi-valuedness at cursor creation time, instead of the latest state, and getSnapshotColumnCapabilities could
// be removed.
return ColumnCapabilitiesImpl.snapshot(index.getCapabilities(column), STORAGE_ADAPTER_CAPABILITIES_COERCE_LOGIC);
return ColumnCapabilitiesImpl.snapshot(
index.getColumnCapabilities(column),
STORAGE_ADAPTER_CAPABILITIES_COERCE_LOGIC
);
}

/**
Expand All @@ -234,7 +237,7 @@ public ColumnCapabilities getColumnCapabilities(String column)
public ColumnCapabilities getSnapshotColumnCapabilities(String column)
{
return ColumnCapabilitiesImpl.snapshot(
index.getCapabilities(column),
index.getColumnCapabilities(column),
SNAPSHOT_STORAGE_ADAPTER_CAPABILITIES_COERCE_LOGIC
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Transformer
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
rowSupplierForValueMatcher::get,
RowSignature::empty, // sad
RowSignature.empty(), // sad
false
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void testContainsNullWhenValuesSetIsTreeSet()
final RowBasedColumnSelectorFactory<MapBasedRow> columnSelectorFactory = RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
() -> new MapBasedRow(0, row),
() -> RowSignature.builder().add("dim", ColumnType.STRING).build(),
RowSignature.builder().add("dim", ColumnType.STRING).build(),
true
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ public static void teardown()
public void testNumericColumns()
{
// incremental index
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities(ColumnHolder.TIME_COLUMN_NAME), ColumnType.LONG);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("d3"), ColumnType.DOUBLE);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("d4"), ColumnType.FLOAT);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("d5"), ColumnType.LONG);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("m1"), ColumnType.DOUBLE);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("m2"), ColumnType.FLOAT);
assertNonStringColumnCapabilities(INC_INDEX.getCapabilities("m3"), ColumnType.LONG);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME), ColumnType.LONG);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("d3"), ColumnType.DOUBLE);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("d4"), ColumnType.FLOAT);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("d5"), ColumnType.LONG);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("m1"), ColumnType.DOUBLE);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("m2"), ColumnType.FLOAT);
assertNonStringColumnCapabilities(INC_INDEX.getColumnCapabilities("m3"), ColumnType.LONG);

// segment index
assertNonStringColumnCapabilities(
Expand All @@ -186,15 +186,15 @@ public void testNumericColumnsWithNulls()
// incremental index
// time does not have nulls
assertNonStringColumnCapabilities(
INC_INDEX_WITH_NULLS.getCapabilities(ColumnHolder.TIME_COLUMN_NAME),
INC_INDEX_WITH_NULLS.getColumnCapabilities(ColumnHolder.TIME_COLUMN_NAME),
ColumnType.LONG
);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("d3"), ColumnType.DOUBLE);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("d4"), ColumnType.FLOAT);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("d5"), ColumnType.LONG);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("m1"), ColumnType.DOUBLE);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("m2"), ColumnType.FLOAT);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getCapabilities("m3"), ColumnType.LONG);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("d3"), ColumnType.DOUBLE);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("d4"), ColumnType.FLOAT);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("d5"), ColumnType.LONG);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("m1"), ColumnType.DOUBLE);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("m2"), ColumnType.FLOAT);
assertNonStringColumnCapabilitiesWithNulls(INC_INDEX_WITH_NULLS.getColumnCapabilities("m3"), ColumnType.LONG);

// segment index
assertNonStringColumnCapabilities(
Expand Down Expand Up @@ -230,7 +230,7 @@ public void testNumericColumnsWithNulls()
@Test
public void testStringColumn()
{
ColumnCapabilities caps = INC_INDEX.getCapabilities("d1");
ColumnCapabilities caps = INC_INDEX.getColumnCapabilities("d1");
Assert.assertEquals(ValueType.STRING, caps.getType());
Assert.assertTrue(caps.hasBitmapIndexes());
Assert.assertTrue(caps.isDictionaryEncoded().isMaybeTrue());
Expand Down Expand Up @@ -265,7 +265,7 @@ public void testStringColumn()
@Test
public void testStringColumnWithNulls()
{
ColumnCapabilities caps = INC_INDEX_WITH_NULLS.getCapabilities("d1");
ColumnCapabilities caps = INC_INDEX_WITH_NULLS.getColumnCapabilities("d1");
Assert.assertEquals(ValueType.STRING, caps.getType());
Assert.assertTrue(caps.hasBitmapIndexes());
Assert.assertTrue(caps.isDictionaryEncoded().isTrue());
Expand Down Expand Up @@ -298,7 +298,7 @@ public void testStringColumnWithNulls()
@Test
public void testMultiStringColumn()
{
ColumnCapabilities caps = INC_INDEX.getCapabilities("d2");
ColumnCapabilities caps = INC_INDEX.getColumnCapabilities("d2");
Assert.assertEquals(ValueType.STRING, caps.getType());
Assert.assertTrue(caps.hasBitmapIndexes());
Assert.assertTrue(caps.isDictionaryEncoded().isTrue());
Expand All @@ -323,7 +323,7 @@ public void testMultiStringColumn()
@Test
public void testMultiStringColumnWithNulls()
{
ColumnCapabilities caps = INC_INDEX_WITH_NULLS.getCapabilities("d2");
ColumnCapabilities caps = INC_INDEX_WITH_NULLS.getColumnCapabilities("d2");
Assert.assertEquals(ValueType.STRING, caps.getType());
Assert.assertTrue(caps.hasBitmapIndexes());
Assert.assertTrue(caps.isDictionaryEncoded().isTrue());
Expand All @@ -347,10 +347,10 @@ public void testMultiStringColumnWithNulls()
@Test
public void testComplexColumn()
{
assertComplexColumnCapabilites(INC_INDEX.getCapabilities("m4"));
assertComplexColumnCapabilites(INC_INDEX.getColumnCapabilities("m4"));
assertComplexColumnCapabilites(MMAP_INDEX.getColumnHolder("m4").getCapabilities());
// results for this complex aren't different, we only know that nullability is unknown
assertComplexColumnCapabilites(INC_INDEX_WITH_NULLS.getCapabilities("m4"));
assertComplexColumnCapabilites(INC_INDEX_WITH_NULLS.getColumnCapabilities("m4"));
assertComplexColumnCapabilites(MMAP_INDEX_WITH_NULLS.getColumnHolder("m4").getCapabilities());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ private List<String> selectColumnValuesMatchingFilterUsingRowBasedColumnSelector
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
rowSupplier::get,
rowSignatureBuilder::build,
rowSignatureBuilder.build(),
false
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public class ExpressionVirtualColumnTest extends InitializedNullHandlingTest
private static final ColumnSelectorFactory COLUMN_SELECTOR_FACTORY = RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
CURRENT_ROW::get,
RowSignature::empty,
RowSignature.empty(),
false
);

Expand Down Expand Up @@ -745,7 +745,7 @@ public void testExprEvalSelectorWithLongsAndNulls()
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
CURRENT_ROW::get,
RowSignature.builder().add("x", ColumnType.LONG)::build,
RowSignature.builder().add("x", ColumnType.LONG).build(),
false
),
Parser.parse(SCALE_LONG.getExpression(), TestExprMacroTable.INSTANCE)
Expand All @@ -768,7 +768,7 @@ public void testExprEvalSelectorWithDoublesAndNulls()
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
CURRENT_ROW::get,
RowSignature.builder().add("x", ColumnType.DOUBLE)::build,
RowSignature.builder().add("x", ColumnType.DOUBLE).build(),
false
),
Parser.parse(SCALE_FLOAT.getExpression(), TestExprMacroTable.INSTANCE)
Expand All @@ -791,7 +791,7 @@ public void testExprEvalSelectorWithFloatAndNulls()
RowBasedColumnSelectorFactory.create(
RowAdapters.standardRow(),
CURRENT_ROW::get,
RowSignature.builder().add("x", ColumnType.FLOAT)::build,
RowSignature.builder().add("x", ColumnType.FLOAT).build(),
false
),
Parser.parse(SCALE_FLOAT.getExpression(), TestExprMacroTable.INSTANCE)
Expand Down
Loading