diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 414a9a4e3cc8..884c40e7f180 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; @@ -46,6 +47,7 @@ import io.druid.segment.BaseProgressIndicator; import io.druid.segment.ProgressIndicator; import io.druid.segment.QueryableIndex; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.OnheapIncrementalIndex; @@ -75,8 +77,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; @@ -216,7 +218,7 @@ private static IncrementalIndex makeIncrementalIndex( Bucket theBucket, AggregatorFactory[] aggs, HadoopDruidIndexerConfig config, - Iterable oldDimOrder + Map oldDimOrder ) { final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig(); @@ -334,10 +336,10 @@ protected void reduce( BytesWritable first = iter.next(); if (iter.hasNext()) { - LinkedHashSet dimOrder = Sets.newLinkedHashSet(); + Map dimOrder = Maps.newLinkedHashMap(); SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; - IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null); + IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null); index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); while (iter.hasNext()) { @@ -345,7 +347,11 @@ protected void reduce( InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); if (!index.canAppendRow()) { - dimOrder.addAll(index.getDimensionOrder()); + for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) { + if (!dimOrder.containsKey(desc.getName())) { + dimOrder.put(desc.getName(), desc.getCapabilities()); + } + } log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason()); flushIndexToContextAndClose(key, index, context); index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder); @@ -548,7 +554,7 @@ protected void reduce( ListeningExecutorService persistExecutor = null; List> persistFutures = Lists.newArrayList(); - IncrementalIndex index = makeIncrementalIndex( + IncrementalIndex index = makeIncrementalIndex( bucket, combiningAggs, config, @@ -565,7 +571,7 @@ protected void reduce( int runningTotalLineCount = 0; long startTime = System.currentTimeMillis(); - Set allDimensionNames = Sets.newLinkedHashSet(); + Map allDimensionNames = Maps.newLinkedHashMap(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); int numBackgroundPersistThreads = config.getSchema().getTuningConfig().getNumBackgroundPersistThreads(); if (numBackgroundPersistThreads > 0) { @@ -606,7 +612,11 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) ++lineCount; if (!index.canAppendRow()) { - allDimensionNames.addAll(index.getDimensionOrder()); + for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) { + if (!allDimensionNames.containsKey(desc.getName())) { + allDimensionNames.put(desc.getName(), desc.getCapabilities()); + } + } log.info(index.getOutOfRowsReason()); log.info( @@ -656,7 +666,11 @@ public void doRun() } } - allDimensionNames.addAll(index.getDimensionOrder()); + for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) { + if (!allDimensionNames.containsKey(desc.getName())) { + allDimensionNames.put(desc.getName(), desc.getCapabilities()); + } + } log.info("%,d lines completed.", lineCount); @@ -694,7 +708,7 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator interval, config.getSchema().getTuningConfig().getVersion(), null, - ImmutableList.copyOf(allDimensionNames), + ImmutableList.copyOf(allDimensionNames.keySet()), metricNames, config.getShardSpec(bucket).getActualSpec(), -1, diff --git a/processing/src/main/java/io/druid/segment/column/ColumnCapabilities.java b/processing/src/main/java/io/druid/segment/column/ColumnCapabilities.java index f6b416415326..d8955ba694c4 100644 --- a/processing/src/main/java/io/druid/segment/column/ColumnCapabilities.java +++ b/processing/src/main/java/io/druid/segment/column/ColumnCapabilities.java @@ -31,5 +31,7 @@ public interface ColumnCapabilities public boolean hasSpatialIndexes(); public boolean hasMultipleValues(); + public ColumnCapabilities clone(); + public ColumnCapabilitiesImpl merge(ColumnCapabilities other); } diff --git a/processing/src/main/java/io/druid/segment/column/ColumnCapabilitiesImpl.java b/processing/src/main/java/io/druid/segment/column/ColumnCapabilitiesImpl.java index f8b14e37ceed..0a97c72d0e91 100644 --- a/processing/src/main/java/io/druid/segment/column/ColumnCapabilitiesImpl.java +++ b/processing/src/main/java/io/druid/segment/column/ColumnCapabilitiesImpl.java @@ -105,6 +105,19 @@ public boolean hasMultipleValues() return hasMultipleValues; } + @Override + public ColumnCapabilities clone() + { + ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); + capabilities.type = type; + capabilities.dictionaryEncoded = dictionaryEncoded; + capabilities.runLengthEncoded = runLengthEncoded; + capabilities.hasInvertedIndexes = hasInvertedIndexes; + capabilities.hasSpatialIndexes = hasSpatialIndexes; + capabilities.hasMultipleValues = hasMultipleValues; + return capabilities; + } + public ColumnCapabilitiesImpl setHasMultipleValues(boolean hasMultipleValues) { this.hasMultipleValues = hasMultipleValues; diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 673a79d66868..ff8f2c78d313 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -800,19 +800,25 @@ public List getDimensionOrder() * Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of * https://github.com/druid-io/druid/issues/2011 */ - public void loadDimensionIterable(Iterable oldDimensionOrder) + public void loadDimensionIterable(Map oldDimensionOrder) { synchronized (dimensionDescs) { if (!dimensionDescs.isEmpty()) { throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet()); } - for (String dim : oldDimensionOrder) { - if (dimensionDescs.get(dim) == null) { - ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); - columnCapabilities.put(dim, capabilities); - addNewDimension(dim, capabilities); + for (Map.Entry entry : oldDimensionOrder.entrySet()) { + String dimension = entry.getKey(); + ColumnCapabilities prev = entry.getValue(); + if (dimensionDescs.get(dimension) != null) { + continue; } + ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension); + if (capabilities == null) { + capabilities = (ColumnCapabilitiesImpl) prev.clone(); + capabilities.setHasMultipleValues(false); + } + columnCapabilities.put(dimension, capabilities); + addNewDimension(dimension, capabilities); } } } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 91326192fdd9..11c3eae590e8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -24,12 +24,13 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; +import com.google.common.collect.Maps; import com.metamx.common.IAE; import com.metamx.common.ISE; import io.druid.data.input.InputRow; import io.druid.query.aggregation.AggregatorFactory; import io.druid.segment.QueryableIndex; +import io.druid.segment.column.ColumnCapabilities; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.incremental.IndexSizeExceededException; @@ -43,8 +44,8 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Iterator; -import java.util.LinkedHashSet; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -60,7 +61,7 @@ public class Sink implements Iterable private final int maxRowsInMemory; private final boolean reportParseExceptions; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); - private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); + private final Map dimOrder = Maps.newLinkedHashMap(); private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger(); private volatile FireHydrant currHydrant; private volatile boolean writable = true; @@ -251,11 +252,17 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) if (lastHydrant.hasSwapped()) { QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex(); for (String dim : oldIndex.getAvailableDimensions()) { - dimOrder.add(dim); + if (!dimOrder.containsKey(dim)) { + dimOrder.put(dim, oldIndex.getColumn(dim).getCapabilities()); + } } } else { - IncrementalIndex oldIndex = lastHydrant.getIndex(); - dimOrder.addAll(oldIndex.getDimensionOrder()); + IncrementalIndex oldIndex = lastHydrant.getIndex(); + for (IncrementalIndex.DimensionDesc desc : oldIndex.getDimensions()) { + if (!dimOrder.containsKey(desc.getName())) { + dimOrder.put(desc.getName(), desc.getCapabilities()); + } + } } newIndex.loadDimensionIterable(dimOrder); }