Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
Expand All @@ -46,6 +47,7 @@
import io.druid.segment.BaseProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OnheapIncrementalIndex;
Expand Down Expand Up @@ -75,8 +77,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -216,7 +218,7 @@ private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
Iterable<String> oldDimOrder
Map<String, ColumnCapabilities> oldDimOrder
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand Down Expand Up @@ -334,18 +336,22 @@ protected void reduce(
BytesWritable first = iter.next();

if (iter.hasNext()) {
LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
Map<String, ColumnCapabilities> dimOrder = Maps.newLinkedHashMap();
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null);
IncrementalIndex<?> index = makeIncrementalIndex(bucket, combiningAggs, config, null);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));

while (iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);

if (!index.canAppendRow()) {
dimOrder.addAll(index.getDimensionOrder());
for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) {
if (!dimOrder.containsKey(desc.getName())) {
dimOrder.put(desc.getName(), desc.getCapabilities());
}
}
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, dimOrder);
Expand Down Expand Up @@ -548,7 +554,7 @@ protected void reduce(

ListeningExecutorService persistExecutor = null;
List<ListenableFuture<?>> persistFutures = Lists.newArrayList();
IncrementalIndex index = makeIncrementalIndex(
IncrementalIndex<?> index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
Expand All @@ -565,7 +571,7 @@ protected void reduce(
int runningTotalLineCount = 0;
long startTime = System.currentTimeMillis();

Set<String> allDimensionNames = Sets.newLinkedHashSet();
Map<String, ColumnCapabilities> allDimensionNames = Maps.newLinkedHashMap();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int numBackgroundPersistThreads = config.getSchema().getTuningConfig().getNumBackgroundPersistThreads();
if (numBackgroundPersistThreads > 0) {
Expand Down Expand Up @@ -606,7 +612,11 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
++lineCount;

if (!index.canAppendRow()) {
allDimensionNames.addAll(index.getDimensionOrder());
for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) {
if (!allDimensionNames.containsKey(desc.getName())) {
allDimensionNames.put(desc.getName(), desc.getCapabilities());
}
}

log.info(index.getOutOfRowsReason());
log.info(
Expand Down Expand Up @@ -656,7 +666,11 @@ public void doRun()
}
}

allDimensionNames.addAll(index.getDimensionOrder());
for (IncrementalIndex.DimensionDesc desc : index.getDimensions()) {
if (!allDimensionNames.containsKey(desc.getName())) {
allDimensionNames.put(desc.getName(), desc.getCapabilities());
}
}

log.info("%,d lines completed.", lineCount);

Expand Down Expand Up @@ -694,7 +708,7 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
interval,
config.getSchema().getTuningConfig().getVersion(),
null,
ImmutableList.copyOf(allDimensionNames),
ImmutableList.copyOf(allDimensionNames.keySet()),
metricNames,
config.getShardSpec(bucket).getActualSpec(),
-1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ public interface ColumnCapabilities
public boolean hasSpatialIndexes();
public boolean hasMultipleValues();

public ColumnCapabilities clone();

public ColumnCapabilitiesImpl merge(ColumnCapabilities other);
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,19 @@ public boolean hasMultipleValues()
return hasMultipleValues;
}

@Override
public ColumnCapabilities clone()
{
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.type = type;
capabilities.dictionaryEncoded = dictionaryEncoded;
capabilities.runLengthEncoded = runLengthEncoded;
capabilities.hasInvertedIndexes = hasInvertedIndexes;
capabilities.hasSpatialIndexes = hasSpatialIndexes;
capabilities.hasMultipleValues = hasMultipleValues;
return capabilities;
}

public ColumnCapabilitiesImpl setHasMultipleValues(boolean hasMultipleValues)
{
this.hasMultipleValues = hasMultipleValues;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -800,19 +800,25 @@ public List<String> getDimensionOrder()
* Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of
* https://github.com/druid-io/druid/issues/2011
*/
public void loadDimensionIterable(Iterable<String> oldDimensionOrder)
public void loadDimensionIterable(Map<String, ColumnCapabilities> oldDimensionOrder)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit: rename method to loadDimensions

{
synchronized (dimensionDescs) {
if (!dimensionDescs.isEmpty()) {
throw new ISE("Cannot load dimension order when existing order[%s] is not empty.", dimensionDescs.keySet());
}
for (String dim : oldDimensionOrder) {
if (dimensionDescs.get(dim) == null) {
ColumnCapabilitiesImpl capabilities = new ColumnCapabilitiesImpl();
capabilities.setType(ValueType.STRING);
columnCapabilities.put(dim, capabilities);
addNewDimension(dim, capabilities);
for (Map.Entry<String, ColumnCapabilities> entry : oldDimensionOrder.entrySet()) {
String dimension = entry.getKey();
ColumnCapabilities prev = entry.getValue();
if (dimensionDescs.get(dimension) != null) {
continue;
}
ColumnCapabilitiesImpl capabilities = columnCapabilities.get(dimension);
if (capabilities == null) {
capabilities = (ColumnCapabilitiesImpl) prev.clone();
capabilities.setHasMultipleValues(false);
}
columnCapabilities.put(dimension, capabilities);
addNewDimension(dimension, capabilities);
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions server/src/main/java/io/druid/segment/realtime/plumber/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Maps;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import io.druid.data.input.InputRow;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.IndexSizeExceededException;
Expand All @@ -43,8 +44,8 @@
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -60,7 +61,7 @@ public class Sink implements Iterable<FireHydrant>
private final int maxRowsInMemory;
private final boolean reportParseExceptions;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
private final Map<String, ColumnCapabilities> dimOrder = Maps.newLinkedHashMap();
private final AtomicInteger numRowsExcludingCurrIndex = new AtomicInteger();
private volatile FireHydrant currHydrant;
private volatile boolean writable = true;
Expand Down Expand Up @@ -251,11 +252,17 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
if (lastHydrant.hasSwapped()) {
QueryableIndex oldIndex = lastHydrant.getSegment().asQueryableIndex();
for (String dim : oldIndex.getAvailableDimensions()) {
dimOrder.add(dim);
if (!dimOrder.containsKey(dim)) {
dimOrder.put(dim, oldIndex.getColumn(dim).getCapabilities());
}
}
} else {
IncrementalIndex oldIndex = lastHydrant.getIndex();
dimOrder.addAll(oldIndex.getDimensionOrder());
IncrementalIndex<?> oldIndex = lastHydrant.getIndex();
for (IncrementalIndex.DimensionDesc desc : oldIndex.getDimensions()) {
if (!dimOrder.containsKey(desc.getName())) {
dimOrder.put(desc.getName(), desc.getCapabilities());
}
}
}
newIndex.loadDimensionIterable(dimOrder);
}
Expand Down