Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ protected void reduce(final BytesWritable key, Iterable<BytesWritable> values, f
dimOrder.addAll(index.getDimensionOrder());
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnCapabilities());
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder, index.getColumnHandlerCapabilities());
}

index.add(value);
Expand Down Expand Up @@ -752,7 +752,7 @@ public void doRun()
combiningAggs,
config,
allDimensionNames,
persistIndex.getColumnCapabilities()
persistIndex.getColumnHandlerCapabilities()
);
startTime = System.currentTimeMillis();
++indexCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ private void processDimensionsSpec(final QueryableIndex index)
dimension,
createDimensionSchema(
dimension,
columnHolder.getCapabilities(),
columnHolder.getHandlerCapabilities(),
dimensionHandler.getMultivalueHandling()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
new ParallelIndexTuningConfig(
null,
null,
new OnheapIncrementalIndex.Spec(true),
new OnheapIncrementalIndex.Spec(true, false),
40000,
2000L,
null,
Expand Down Expand Up @@ -316,7 +316,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
),
new ClientCompactionTaskQueryTuningConfig(
100,
new OnheapIncrementalIndex.Spec(true),
new OnheapIncrementalIndex.Spec(true, false),
40000,
2000L,
30000L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
@Override
public StringAnyVectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{

ColumnCapabilities capabilities = selectorFactory.getColumnCapabilities(fieldName);
// null capabilities mean the column doesn't exist, so in vector engines the selector will never be multi-value
if (capabilities != null && capabilities.hasMultipleValues().isMaybeTrue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@ public Expr apply(List<Expr> args)
final StructuredDataProcessor processor = new StructuredDataProcessor()
{
@Override
public int processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
public StructuredDataProcessor.ProcessedLiteral<?> processLiteralField(ArrayList<NestedPathPart> fieldPath, Object fieldValue)
{
// do nothing, we only want the list of fields returned by this processor
return 0;
return StructuredDataProcessor.ProcessedLiteral.NULL_LITERAL;
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,15 @@ private ColumnAnalysis analyzeStringColumn(
} else if (capabilities.isDictionaryEncoded().isTrue()) {
// fallback if no bitmap index
try (BaseColumn column = columnHolder.getColumn()) {
DictionaryEncodedColumn<String> theColumn = (DictionaryEncodedColumn<String>) column;
cardinality = theColumn.getCardinality();
if (analyzingMinMax() && cardinality > 0) {
min = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(0));
max = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(cardinality - 1));
if (column instanceof DictionaryEncodedColumn) {
DictionaryEncodedColumn<String> theColumn = (DictionaryEncodedColumn<String>) column;
cardinality = theColumn.getCardinality();
if (analyzingMinMax() && cardinality > 0) {
min = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(0));
max = NullHandling.nullToEmptyIfNeeded(theColumn.lookupName(cardinality - 1));
}
} else {
cardinality = 0;
}
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ ColumnValueSelector<?> makeColumnValueSelector(
);

ColumnCapabilities getColumnCapabilities();

default ColumnCapabilities getHandlerCapabilities()
{
return getColumnCapabilities();
}

/**
* Compares the row values for this DimensionIndexer's dimension from a Row key.
*
Expand Down Expand Up @@ -301,6 +307,11 @@ boolean checkUnsortedEncodedKeyComponentsEqual(
* is needed to be able to correctly map per-segment encoded values to global values on the next conversion step,
* {@link DimensionMerger#convertSortedSegmentRowValuesToMergedRowValues}. The latter method requires sorted encoding
* values on the input, because {@link DimensionMerger#writeMergedValueDictionary} takes sorted lookups as it's input.
*
* For columns which do not use the {@link DimensionMerger} to merge dictionary encoded values, this method should
* provide a selector which is compatible with the expectations of
* {@link DimensionMerger#processMergedRow(ColumnValueSelector)}, which might simply be to pass-through the 'unsorted'
* selector.
*/
ColumnValueSelector convertUnsortedValuesToSorted(ColumnValueSelector selectorWithUnsortedValues);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
import org.apache.druid.segment.column.ColumnDescriptor;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.TypeSignature;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.incremental.IncrementalIndex;
Expand Down Expand Up @@ -262,10 +263,9 @@ private File makeIndexFiles(
log.debug("Completed factory.json in %,d millis", System.currentTimeMillis() - startTime);

progress.progress();
final Map<String, ValueType> metricsValueTypes = new TreeMap<>(Comparators.naturalNullsFirst());
final Map<String, String> metricTypeNames = new TreeMap<>(Comparators.naturalNullsFirst());
final Map<String, TypeSignature<ValueType>> metricTypes = new TreeMap<>(Comparators.naturalNullsFirst());
final List<ColumnCapabilities> dimCapabilities = Lists.newArrayListWithCapacity(mergedDimensions.size());
mergeCapabilities(adapters, mergedDimensions, metricsValueTypes, metricTypeNames, dimCapabilities);
mergeCapabilities(adapters, mergedDimensions, metricTypes, dimCapabilities);

final Map<String, DimensionHandler> handlers = makeDimensionHandlers(mergedDimensions, dimCapabilities);
final List<DimensionMergerV9> mergers = new ArrayList<>();
Expand Down Expand Up @@ -301,7 +301,7 @@ private File makeIndexFiles(
closer.register(timeAndDimsIterator);
final GenericColumnSerializer timeWriter = setupTimeWriter(segmentWriteOutMedium, indexSpec);
final ArrayList<GenericColumnSerializer> metricWriters =
setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, metricsValueTypes, metricTypeNames, indexSpec);
setupMetricsWriters(segmentWriteOutMedium, mergedMetrics, metricTypes, indexSpec);
IndexMergeResult indexMergeResult = mergeIndexesAndWriteColumns(
adapters,
progress,
Expand All @@ -320,8 +320,7 @@ private File makeIndexFiles(
v9Smoosher,
progress,
mergedMetrics,
metricsValueTypes,
metricTypeNames,
metricTypes,
metricWriters,
indexSpec
);
Expand Down Expand Up @@ -494,8 +493,7 @@ private void makeMetricsColumns(
final FileSmoosher v9Smoosher,
final ProgressIndicator progress,
final List<String> mergedMetrics,
final Map<String, ValueType> metricsValueTypes,
final Map<String, String> metricTypeNames,
final Map<String, TypeSignature<ValueType>> metricsTypes,
final List<GenericColumnSerializer> metWriters,
final IndexSpec indexSpec
) throws IOException
Expand All @@ -510,8 +508,8 @@ private void makeMetricsColumns(
GenericColumnSerializer writer = metWriters.get(i);

final ColumnDescriptor.Builder builder = ColumnDescriptor.builder();
ValueType type = metricsValueTypes.get(metric);
switch (type) {
TypeSignature<ValueType> type = metricsTypes.get(metric);
switch (type.getType()) {
case LONG:
builder.setValueType(ValueType.LONG);
builder.addSerde(createLongColumnPartSerde(writer, indexSpec));
Expand All @@ -525,7 +523,7 @@ private void makeMetricsColumns(
builder.addSerde(createDoubleColumnPartSerde(writer, indexSpec));
break;
case COMPLEX:
final String typeName = metricTypeNames.get(metric);
final String typeName = type.getComplexTypeName();
builder.setValueType(ValueType.COMPLEX);
builder.addSerde(
ComplexColumnPartSerde
Expand Down Expand Up @@ -686,7 +684,7 @@ private IndexMergeResult mergeIndexesAndWriteColumns(
}

for (int dimIndex = 0; dimIndex < timeAndDims.getNumDimensions(); dimIndex++) {
DimensionMerger merger = mergers.get(dimIndex);
DimensionMergerV9 merger = mergers.get(dimIndex);
if (merger.hasOnlyNulls()) {
continue;
}
Expand Down Expand Up @@ -763,17 +761,16 @@ private GenericColumnSerializer setupTimeWriter(
private ArrayList<GenericColumnSerializer> setupMetricsWriters(
final SegmentWriteOutMedium segmentWriteOutMedium,
final List<String> mergedMetrics,
final Map<String, ValueType> metricsValueTypes,
final Map<String, String> metricTypeNames,
final Map<String, TypeSignature<ValueType>> metricsTypes,
final IndexSpec indexSpec
) throws IOException
{
ArrayList<GenericColumnSerializer> metWriters = Lists.newArrayListWithCapacity(mergedMetrics.size());

for (String metric : mergedMetrics) {
ValueType type = metricsValueTypes.get(metric);
TypeSignature<ValueType> type = metricsTypes.get(metric);
GenericColumnSerializer writer;
switch (type) {
switch (type.getType()) {
case LONG:
writer = createLongColumnSerializer(segmentWriteOutMedium, metric, indexSpec);
break;
Expand All @@ -784,10 +781,9 @@ private ArrayList<GenericColumnSerializer> setupMetricsWriters(
writer = createDoubleColumnSerializer(segmentWriteOutMedium, metric, indexSpec);
break;
case COMPLEX:
final String typeName = metricTypeNames.get(metric);
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(typeName);
ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(type.getComplexTypeName());
if (serde == null) {
throw new ISE("Unknown type[%s]", typeName);
throw new ISE("Unknown type[%s]", type.getComplexTypeName());
}
writer = serde.getSerializer(segmentWriteOutMedium, metric);
break;
Expand Down Expand Up @@ -897,8 +893,7 @@ private void writeDimValuesAndSetupDimConversion(
private void mergeCapabilities(
final List<IndexableAdapter> adapters,
final List<String> mergedDimensions,
final Map<String, ValueType> metricsValueTypes,
final Map<String, String> metricTypeNames,
final Map<String, TypeSignature<ValueType>> metricTypes,
final List<ColumnCapabilities> dimCapabilities
)
{
Expand All @@ -911,12 +906,11 @@ private void mergeCapabilities(
);
}
for (String metric : adapter.getMetricNames()) {
ColumnCapabilities capabilities = adapter.getCapabilities(metric);
capabilitiesMap.compute(metric, (m, existingCapabilities) ->
final ColumnCapabilities capabilities = adapter.getCapabilities(metric);
final ColumnCapabilities merged = capabilitiesMap.compute(metric, (m, existingCapabilities) ->
mergeCapabilities(capabilities, existingCapabilities, METRIC_CAPABILITY_MERGE_LOGIC)
);
metricsValueTypes.put(metric, capabilities.getType());
metricTypeNames.put(metric, adapter.getMetricType(metric));
metricTypes.put(metric, merged);
}
}
for (String dim : mergedDimensions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ public interface IndexableAdapter

BitmapValues getBitmapValues(String dimension, int dictId);

String getMetricType(String metric);

ColumnCapabilities getCapabilities(String column);

Metadata getMetadata();
Expand Down
Loading