From c17ce024675b97ce0d80cae6ffbcf77de30ffd27 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 24 Feb 2016 13:34:06 -0800 Subject: [PATCH] Allow IncrementalIndex to store Long/Float dimensions --- .../IncrementalIndexAddRowsBenchmark.java | 198 +++++++++++++ .../query/filter/ValueMatcherFactory.java | 4 +- .../segment/incremental/IncrementalIndex.java | 273 +++++++++++++++--- .../incremental/IncrementalIndexAdapter.java | 30 +- .../IncrementalIndexStorageAdapter.java | 37 ++- .../incremental/OnheapIncrementalIndex.java | 43 +-- .../SpatialDimensionRowTransformer.java | 3 +- .../plumber/RealtimePlumberSchoolTest.java | 2 +- 8 files changed, 502 insertions(+), 88 deletions(-) create mode 100644 benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java diff --git a/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java new file mode 100644 index 000000000000..5580a46668b9 --- /dev/null +++ b/benchmarks/src/main/java/io/druid/benchmark/IncrementalIndexAddRowsBenchmark.java @@ -0,0 +1,198 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.benchmark; + +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import io.druid.segment.incremental.IncrementalIndex; +import io.druid.segment.incremental.OnheapIncrementalIndex; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.infra.Blackhole; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +public class IncrementalIndexAddRowsBenchmark +{ + private IncrementalIndex incIndex; + private IncrementalIndex incFloatIndex; + private IncrementalIndex incStrIndex; + private static AggregatorFactory[] aggs; + static final int dimensionCount = 8; + private Random rng; + static final int maxRows = 250000; + + private ArrayList longRows = new ArrayList(); + private ArrayList floatRows = new ArrayList(); + private ArrayList stringRows = new ArrayList(); + + + static { + final ArrayList ingestAggregatorFactories = new ArrayList<>(dimensionCount + 1); + ingestAggregatorFactories.add(new CountAggregatorFactory("rows")); + for (int i = 0; i < dimensionCount; ++i) { + ingestAggregatorFactories.add( + new LongSumAggregatorFactory( + String.format("sumResult%s", i), + String.format("Dim_%s", i) + ) + ); + ingestAggregatorFactories.add( + new DoubleSumAggregatorFactory( + String.format("doubleSumResult%s", i), + String.format("Dim_%s", i) + ) + ); + } + aggs = ingestAggregatorFactories.toArray(new AggregatorFactory[0]); + } + + private MapBasedInputRow getLongRow(long timestamp, int rowID, int dimensionCount) + { + List dimensionList = new ArrayList(dimensionCount); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < dimensionCount; i++) { + String dimName = String.format("Dim_%d", i); + dimensionList.add(dimName); + builder.put(dimName, rng.nextLong()); + } + return new MapBasedInputRow(timestamp, dimensionList, builder.build()); + } + + private MapBasedInputRow getFloatRow(long timestamp, int rowID, int dimensionCount) + { + List dimensionList = new ArrayList(dimensionCount); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < dimensionCount; i++) { + String dimName = String.format("Dim_%d", i); + dimensionList.add(dimName); + builder.put(dimName, rng.nextFloat()); + } + return new MapBasedInputRow(timestamp, dimensionList, builder.build()); + } + + private MapBasedInputRow getStringRow(long timestamp, int rowID, int dimensionCount) + { + List dimensionList = new ArrayList(dimensionCount); + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (int i = 0; i < dimensionCount; i++) { + String dimName = String.format("Dim_%d", i); + dimensionList.add(dimName); + builder.put(dimName, String.valueOf(rng.nextLong())); + } + return new MapBasedInputRow(timestamp, dimensionList, builder.build()); + } + + private IncrementalIndex makeIncIndex() + { + return new OnheapIncrementalIndex( + 0, + QueryGranularity.NONE, + aggs, + false, + false, + maxRows + ); + } + + @Setup + public void setup() throws IOException + { + rng = new Random(9999); + + for (int i = 0; i < maxRows; i++) { + longRows.add(getLongRow(0, i, dimensionCount)); + } + + for (int i = 0; i < maxRows; i++) { + floatRows.add(getFloatRow(0, i, dimensionCount)); + } + + for (int i = 0; i < maxRows; i++) { + stringRows.add(getStringRow(0, i, dimensionCount)); + } + } + + @Setup(Level.Iteration) + public void setup2() throws IOException + { + ; + incIndex = makeIncIndex(); + incFloatIndex = makeIncIndex(); + incStrIndex = makeIncIndex(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @OperationsPerInvocation(maxRows) + public void normalLongs(Blackhole blackhole) throws Exception + { + for (int i = 0; i < maxRows; i++) { + InputRow row = longRows.get(i); + int rv = incIndex.add(row); + blackhole.consume(rv); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @OperationsPerInvocation(maxRows) + public void normalFloats(Blackhole blackhole) throws Exception + { + for (int i = 0; i < maxRows; i++) { + InputRow row = floatRows.get(i); + int rv = incFloatIndex.add(row); + blackhole.consume(rv); + } + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + @OperationsPerInvocation(maxRows) + public void normalStrings(Blackhole blackhole) throws Exception + { + for (int i = 0; i < maxRows; i++) { + InputRow row = stringRows.get(i); + int rv = incStrIndex.add(row); + blackhole.consume(rv); + } + } +} diff --git a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java b/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java index 8a2a52363eb6..0bfda5de72f6 100644 --- a/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java +++ b/processing/src/main/java/io/druid/query/filter/ValueMatcherFactory.java @@ -26,7 +26,7 @@ */ public interface ValueMatcherFactory { - public ValueMatcher makeValueMatcher(String dimension, String value); - public ValueMatcher makeValueMatcher(String dimension, Predicate value); + public ValueMatcher makeValueMatcher(String dimension, Comparable value); + public ValueMatcher makeValueMatcher(String dimension, Predicate value); public ValueMatcher makeValueMatcher(String dimension, Bound bound); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index c690f201d40d..97888fd0ff14 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -24,6 +24,7 @@ import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -31,6 +32,7 @@ import com.google.common.primitives.Longs; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.parsers.ParseException; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; @@ -79,6 +81,76 @@ public abstract class IncrementalIndex implements Iterable, { private volatile DateTime maxIngestedEventTime; + private static final Map TYPE_MAP = ImmutableMap.builder() + .put("Long[]", ValueType.LONG) + .put("Double[]", ValueType.FLOAT) + .put("String[]", ValueType.STRING) + .put("Long", ValueType.LONG) + .put("Double", ValueType.FLOAT) + .put("String", ValueType.STRING) + .build(); + + private static final Function STRING_TRANSFORMER = new Function() + { + @Override + public String apply(final Object o) + { + return String.valueOf(o); + } + }; + + private static final Function LONG_TRANSFORMER = new Function() + { + @Override + public Long apply(final Object o) + { + if (o == null) { + return null; + } + if (o instanceof String) { + try { + return Long.valueOf((String) o); + } + catch (NumberFormatException nfe) { + throw new ParseException(nfe, "Unable to parse value[%s] as long in column: ", o); + } + } + if (o instanceof Number) { + return ((Number) o).longValue(); + } + return null; + } + }; + + private static final Function FLOAT_TRANSFORMER = new Function() + { + @Override + public Float apply(final Object o) + { + if (o == null) { + return null; + } + if (o instanceof String) { + try { + return Float.valueOf((String) o); + } + catch (NumberFormatException nfe) { + throw new ParseException(nfe, "Unable to parse value[%s] as float in column: ", o); + } + } + if (o instanceof Number) { + return ((Number) o).floatValue(); + } + return null; + } + }; + + private static final Map VALUE_TRANSFORMS = ImmutableMap.builder() + .put(ValueType.LONG, LONG_TRANSFORMER) + .put(ValueType.FLOAT, FLOAT_TRANSFORMER) + .put(ValueType.STRING, STRING_TRANSFORMER) + .build(); + public static ColumnSelectorFactory makeColumnSelectorFactory( final AggregatorFactory agg, final Supplier in, @@ -271,6 +343,7 @@ public int lookupId(String name) private final Metadata metadata; private final Map metricDescs; + private final Map dimensionDescs; private final Map columnCapabilities; private final List dimValues; @@ -347,20 +420,33 @@ public IncrementalIndex( } } - private DimDim newDimDim(String dimension) - { - return new NullValueConverterDimDim(makeDimDim(dimension, dimensionDescs)); + private DimDim newDimDim(String dimension, ValueType type) { + DimDim newDimDim; + switch (type) { + case LONG: + newDimDim = makeDimDim(dimension, getDimensionDescs()); + break; + case FLOAT: + newDimDim = makeDimDim(dimension, getDimensionDescs()); + break; + case STRING: + newDimDim = new NullValueConverterDimDim(makeDimDim(dimension, getDimensionDescs())); + break; + default: + throw new IAE("Invalid column type: " + type); + } + return newDimDim; } + // use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation + protected abstract DimDim makeDimDim(String dimension, Object lock); + public abstract ConcurrentNavigableMap getFacts(); public abstract boolean canAppendRow(); public abstract String getOutOfRowsReason(); - // use newDimDim - protected abstract DimDim makeDimDim(String dimension, Object lock); - protected abstract AggregatorType[] initAggs( AggregatorFactory[] metrics, Supplier rowSupplier, @@ -407,6 +493,48 @@ public InputRow formatRow(InputRow row) return row; } + private ValueType getTypeFromDimVal(Object dimVal) + { + Object singleVal; + if (dimVal instanceof List) { + List dimValList = (List) dimVal; + singleVal = dimValList.size() == 0 ? null : dimValList.get(0); + } else { + singleVal = dimVal; + } + + if (singleVal == null) { + return null; + } + + return TYPE_MAP.get(singleVal.getClass().getSimpleName()); + } + + private List getRowDimensionAsComparables(InputRow row, String dimension, ValueType type) + { + final Object dimVal = row.getRaw(dimension); + final Function transformer = VALUE_TRANSFORMS.get(type); + final List dimensionValues; + try { + if (dimVal == null) { + dimensionValues = Collections.emptyList(); + } else if (dimVal instanceof List) { + dimensionValues = Lists.transform((List) dimVal, transformer); + } else { + dimensionValues = Collections.singletonList((Comparable) transformer.apply(dimVal)); + } + } + catch (ParseException pe) { + throw new ParseException(pe.getMessage() + dimension); + } + return dimensionValues; + } + + public Map getDimensionDescs() + { + return dimensionDescs; + } + /** * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. @@ -446,13 +574,17 @@ TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException final List rowDimensions = row.getDimensions(); int[][] dims; + ValueType[] types; List overflow = null; + List overflowTypes = null; synchronized (dimensionDescs) { dims = new int[dimensionDescs.size()][]; + types = new ValueType[dimensionDescs.size()]; for (String dimension : rowDimensions) { - List dimensionValues = row.getDimension(dimension); + List dimensionValues; ColumnCapabilitiesImpl capabilities; + ValueType valType = null; DimensionDesc desc = dimensionDescs.get(dimension); if (desc != null) { capabilities = desc.getCapabilities(); @@ -460,10 +592,17 @@ TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException capabilities = columnCapabilities.get(dimension); if (capabilities == null) { capabilities = new ColumnCapabilitiesImpl(); - capabilities.setType(ValueType.STRING); + // TODO: For schemaless type discovery, assume everything is a String for now, can change later. + //valType = getTypeFromDimVal(row.getRaw(dimension)); + if (valType == null) { + valType = ValueType.STRING; + } + capabilities.setType(valType); columnCapabilities.put(dimension, capabilities); } } + valType = capabilities.getType(); + dimensionValues = getRowDimensionAsComparables(row, dimension, valType); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimensionValues.size() > 1) { @@ -475,8 +614,10 @@ TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException if (overflow == null) { overflow = Lists.newArrayList(); + overflowTypes = Lists.newArrayList(); } overflow.add(getDimVals(desc.getValues(), dimensionValues)); + overflowTypes.add(valType); } else if (desc.getIndex() > dims.length || dims[desc.getIndex()] != null) { /* * index > dims.length requires that we saw this dimension and added it to the dimensionOrder map, @@ -490,6 +631,7 @@ TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException throw new ISE("Dimension[%s] occurred more than once in InputRow", dimension); } else { dims[desc.getIndex()] = getDimVals(desc.getValues(), dimensionValues); + types[desc.getIndex()] = valType; } } } @@ -497,15 +639,19 @@ TimeAndDims toTimeAndDims(InputRow row) throws IndexSizeExceededException if (overflow != null) { // Merge overflow and non-overflow int[][] newDims = new int[dims.length + overflow.size()][]; + ValueType[] newTypes = new ValueType[dims.length + overflow.size()]; System.arraycopy(dims, 0, newDims, 0, dims.length); + System.arraycopy(types, 0, newTypes, 0, dims.length); for (int i = 0; i < overflow.size(); ++i) { newDims[dims.length + i] = overflow.get(i); + newTypes[dims.length + i] = overflowTypes.get(i); } dims = newDims; + types = newTypes; } long truncated = gran.truncate(row.getTimestampFromEpoch()); - return new TimeAndDims(Math.max(truncated, minTimestamp), dims); + return new TimeAndDims(Math.max(truncated, minTimestamp), dims, types); } private synchronized void updateMaxIngestedTime(DateTime eventTime) @@ -535,7 +681,7 @@ private long getMaxTimeMillis() return getFacts().lastKey().getTimestamp(); } - private int[] getDimVals(final DimDim dimLookup, final List dimValues) + private int[] getDimVals(final DimDim dimLookup, final List dimValues) { if (dimValues.size() == 0) { // NULL VALUE @@ -544,10 +690,13 @@ private int[] getDimVals(final DimDim dimLookup, final List dimValues) } if (dimValues.size() == 1) { - return new int[]{dimLookup.add(dimValues.get(0))}; + Comparable dimVal = dimValues.get(0); + // For Strings, return an array of dictionary-encoded IDs + // For numerics, return the numeric values directly + return new int[]{dimLookup.add(dimVal)}; } - String[] dimArray = dimValues.toArray(new String[dimValues.size()]); + Comparable[] dimArray = dimValues.toArray(new Comparable[dimValues.size()]); Arrays.sort(dimArray); final int[] retVal = new int[dimArray.length]; @@ -649,7 +798,7 @@ public void loadDimensionIterable(Iterable oldDimensionOrder) @GuardedBy("dimensionDescs") private DimensionDesc addNewDimension(String dim, ColumnCapabilitiesImpl capabilities) { - DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim), capabilities); + DimensionDesc desc = new DimensionDesc(dimensionDescs.size(), dim, newDimDim(dim, capabilities.getType()), capabilities); if (dimValues.size() != desc.getIndex()) { throw new ISE("dimensionDescs and dimValues for [%s] is out of sync!!", dim); } @@ -724,11 +873,13 @@ public Row apply(final Map.Entry input) final TimeAndDims timeAndDims = input.getKey(); final int rowOffset = input.getValue(); - int[][] theDims = timeAndDims.getDims(); + int[][] theDims = timeAndDims.getDims(); //TODO: remove dictionary encoding for numerics later + ValueType[] types = timeAndDims.getTypes(); Map theVals = Maps.newLinkedHashMap(); for (int i = 0; i < theDims.length; ++i) { int[] dim = theDims[i]; + ValueType type = types[i]; DimensionDesc dimensionDesc = dimensions.get(i); if (dimensionDesc == null) { continue; @@ -739,13 +890,21 @@ public Row apply(final Map.Entry input) continue; } if (dim.length == 1) { - theVals.put(dimensionName, Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[0]))); + Comparable val = dimensionDesc.getValues().getValue(dim[0]); + if (type == ValueType.STRING) { + val = Strings.nullToEmpty((String) val); + } + theVals.put(dimensionName, val); } else { - String[] dimStringValue = new String[dim.length]; - for (int j = 0; j < dimStringValue.length; j++) { - dimStringValue[j] = Strings.nullToEmpty(dimensionDesc.getValues().getValue(dim[j])); + Comparable[] dimVals = new Comparable[dim.length]; + for (int j = 0; j < dimVals.length; j++) { + Comparable val = dimensionDesc.getValues().getValue(dim[j]); + if (type == ValueType.STRING) { + val = Strings.nullToEmpty((String) val); + } + dimVals[j] = val; } - theVals.put(dimensionName, dimStringValue); + theVals.put(dimensionName, dimVals); } } @@ -852,42 +1011,42 @@ public ColumnCapabilitiesImpl getCapabilities() } } - static interface DimDim + static interface DimDim> { - public int getId(String value); + public int getId(T value); - public String getValue(int id); + public T getValue(int id); - public boolean contains(String value); + public boolean contains(T value); public int size(); - public String getMinValue(); + public T getMinValue(); - public String getMaxValue(); + public T getMaxValue(); - public int add(String value); + public int add(T value); public SortedDimLookup sort(); } - static interface SortedDimLookup + static interface SortedDimLookup> { public int size(); - public int idToIndex(int id); + public int getSortedIdFromUnsortedId(int id); - public int indexToId(int index); + public int getUnsortedIdFromSortedId(int index); - public String getValue(int index); + public T getValueFromSortedId(int index); } /** * implementation which converts null strings to empty strings and vice versa. */ - static class NullValueConverterDimDim implements DimDim + static class NullValueConverterDimDim implements DimDim { - private final DimDim delegate; + private final DimDim delegate; NullValueConverterDimDim(DimDim delegate) { @@ -943,9 +1102,9 @@ public SortedDimLookup sort() } } - private static class NullValueConverterDimLookup implements SortedDimLookup + private static class NullValueConverterDimLookup implements SortedDimLookup { - private final SortedDimLookup delegate; + private final SortedDimLookup delegate; public NullValueConverterDimLookup(SortedDimLookup delegate) { @@ -959,21 +1118,21 @@ public int size() } @Override - public int indexToId(int index) + public int getUnsortedIdFromSortedId(int index) { - return delegate.indexToId(index); + return delegate.getUnsortedIdFromSortedId(index); } @Override - public int idToIndex(int id) + public int getSortedIdFromUnsortedId(int id) { - return delegate.idToIndex(id); + return delegate.getSortedIdFromUnsortedId(id); } @Override - public String getValue(int index) + public String getValueFromSortedId(int index) { - return Strings.emptyToNull(delegate.getValue(index)); + return Strings.emptyToNull(delegate.getValueFromSortedId(index)); } } @@ -981,14 +1140,17 @@ static final class TimeAndDims { private final long timestamp; private final int[][] dims; + private final ValueType[] types; TimeAndDims( long timestamp, - int[][] dims + int[][] dims, + ValueType[] types ) { this.timestamp = timestamp; this.dims = dims; + this.types = types; } long getTimestamp() @@ -1001,6 +1163,11 @@ int[][] getDims() return dims; } + public ValueType[] getTypes() + { + return types; + } + @Override public String toString() { @@ -1045,6 +1212,26 @@ public int compare(TimeAndDims lhs, TimeAndDims rhs) int index = 0; while (retVal == 0 && index < numComparisons) { + ValueType lhsType = lhs.types[index]; + ValueType rhsType = rhs.types[index]; + + if (lhsType == null) { + if (rhsType == null) { + ++index; + continue; + } + return -1; + } + + if (rhsType == null) { + return 1; + } + + retVal = lhsType.compareTo(rhsType); + if (retVal != 0) { + return retVal; + } + final int[] lhsIdxs = lhs.dims[index]; final int[] rhsIdxs = rhs.dims[index]; @@ -1066,8 +1253,8 @@ public int compare(TimeAndDims lhs, TimeAndDims rhs) while (retVal == 0 && valsIndex < lhsIdxs.length) { if (lhsIdxs[valsIndex] != rhsIdxs[valsIndex]) { final DimDim dimLookup = dimValues.get(index); - final String lhsVal = dimLookup.getValue(lhsIdxs[valsIndex]); - final String rhsVal = dimLookup.getValue(rhsIdxs[valsIndex]); + final Comparable lhsVal = dimLookup.getValue(lhsIdxs[valsIndex]); + final Comparable rhsVal = dimLookup.getValue(rhsIdxs[valsIndex]); if (lhsVal != null && rhsVal != null) { retVal = lhsVal.compareTo(rhsVal); } else if (lhsVal == null ^ rhsVal == null) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index e1dfb7208873..057efbc67292 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -20,7 +20,6 @@ package io.druid.segment.incremental; import com.google.common.base.Function; -import com.google.common.base.Strings; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -130,7 +129,8 @@ public IncrementalIndexAdapter( final MutableBitmap[] bitmapIndexes = indexer.invertedIndexes; - for (int dimIdx : dims[dimIndex]) { + for (Comparable dimIdxComparable : dims[dimIndex]) { + Integer dimIdx = (Integer) dimIdxComparable; if (bitmapIndexes[dimIdx] == null) { bitmapIndexes[dimIdx] = bitmapFactory.makeEmptyMutableBitmap(); } @@ -198,14 +198,16 @@ public int size() @Override public String get(int index) { - return dimLookup.getValue(index); + Comparable val = dimLookup.getValueFromSortedId(index); + String strVal = val != null ? val.toString() : null; + return strVal; } @Override public int indexOf(String value) { int id = dimDim.getId(value); - return id < 0 ? -1 : dimLookup.idToIndex(id); + return id < 0 ? -1 : dimLookup.getSortedIdFromUnsortedId(id); } @Override @@ -225,9 +227,9 @@ public Iterable getRows() public Iterator iterator() { final List dimensions = index.getDimensions(); - final IncrementalIndex.SortedDimLookup[] dimLookups = new IncrementalIndex.SortedDimLookup[dimensions.size()]; + final IncrementalIndex.SortedDimLookup[] sortedDimLookups = new IncrementalIndex.SortedDimLookup[dimensions.size()]; for (IncrementalIndex.DimensionDesc dimension : dimensions) { - dimLookups[dimension.getIndex()] = indexers.get(dimension.getName()).getDimLookup(); + sortedDimLookups[dimension.getIndex()] = indexers.get(dimension.getName()).getDimLookup(); } /* @@ -263,7 +265,9 @@ public Rowboat apply(Map.Entry input) } for (int i = 0; i < dimValues[dimIndex].length; ++i) { - dims[dimIndex][i] = dimLookups[dimIndex].idToIndex(dimValues[dimIndex][i]); + dims[dimIndex][i] = sortedDimLookups[dimIndex].getSortedIdFromUnsortedId(dimValues[dimIndex][i]); + //TODO: in later PR, Rowboat will use Comparable[][] instead of int[][] + // Can remove dictionary encoding for numeric dims then. } } @@ -294,7 +298,7 @@ public IndexedInts getBitmapIndex(String dimension, int index) } IncrementalIndex.SortedDimLookup dimLookup = accessor.getDimLookup(); - final int id = dimLookup.indexToId(index); + final int id = dimLookup.getUnsortedIdFromSortedId(index); if (id < 0 || id >= dimLookup.size()) { return EmptyIndexedInts.EMPTY_INDEXED_INTS; } @@ -326,9 +330,17 @@ private boolean hasNullValue(IncrementalIndex.DimDim dimDim, int[] dimIndices) return true; } for (int dimIndex : dimIndices) { - if (Strings.isNullOrEmpty(dimDim.getValue(dimIndex))) { + Comparable val = dimDim.getValue(dimIndex); + + if (val == null) { return true; } + + if (val instanceof String) { + if (((String) val).length() == 0) { + return true; + } + } } return false; } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index b1975c3a7b1c..aecf8b6764ba 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -234,10 +234,10 @@ public Cursor apply(@Nullable final Long input) { cursorMap = index.getSubMap( new IncrementalIndex.TimeAndDims( - timeStart, new int[][]{} + timeStart, new int[][]{}, null ), new IncrementalIndex.TimeAndDims( - Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{} + Math.min(actualInterval.getEndMillis(), gran.next(input)), new int[][]{}, null ) ); if (descending) { @@ -414,8 +414,10 @@ public int getValueCardinality() @Override public String lookupName(int id) { - final String value = dimValLookup.getValue(id); - return extractionFn == null ? value : extractionFn.apply(value); + // TODO: needs update to DimensionSelector interface to allow multi-types, just use Strings for now + final Comparable value = dimValLookup.getValue(id); + final String strValue = value == null ? null : value.toString(); + return extractionFn == null ? strValue : extractionFn.apply(strValue); } @@ -577,7 +579,7 @@ public Object get() if (dimIdx.length == 1) { return dimDim.getValue(dimIdx[0]); } - String[] dimVals = new String[dimIdx.length]; + Comparable[] dimVals = new String[dimIdx.length]; for (int i = 0; i < dimIdx.length; i++) { dimVals[i] = dimDim.getValue(dimIdx[i]); } @@ -594,6 +596,14 @@ public Object get() ); } + private boolean isComparableNullOrEmpty(final Comparable value) + { + if (value instanceof String) { + return Strings.isNullOrEmpty((String) value); + } + return value == null; + } + private ValueMatcher makeFilterMatcher(final Filter filter, final EntryHolder holder) { return filter == null @@ -638,18 +648,18 @@ public EntryHolderValueMatcherFactory( } @Override - public ValueMatcher makeValueMatcher(String dimension, final String value) + public ValueMatcher makeValueMatcher(String dimension, final Comparable value) { IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); if (dimensionDesc == null) { - return new BooleanValueMatcher(Strings.isNullOrEmpty(value)); + return new BooleanValueMatcher(isComparableNullOrEmpty(value)); } final int dimIndex = dimensionDesc.getIndex(); final IncrementalIndex.DimDim dimDim = dimensionDesc.getValues(); final Integer id = dimDim.getId(value); if (id == null) { - if (Strings.isNullOrEmpty(value)) { + if (isComparableNullOrEmpty(value)) { return new ValueMatcher() { @Override @@ -673,7 +683,7 @@ public boolean matches() { int[][] dims = holder.getKey().getDims(); if (dimIndex >= dims.length || dims[dimIndex] == null) { - return Strings.isNullOrEmpty(value); + return isComparableNullOrEmpty(value); } return Ints.indexOf(dims[dimIndex], id) >= 0; @@ -682,7 +692,7 @@ public boolean matches() } @Override - public ValueMatcher makeValueMatcher(String dimension, final Predicate predicate) + public ValueMatcher makeValueMatcher(String dimension, final Predicate predicate) { IncrementalIndex.DimensionDesc dimensionDesc = index.getDimension(dimension); if (dimensionDesc == null) { @@ -732,7 +742,12 @@ public boolean matches() } for (int dimVal : dims[dimIndex]) { - List stringCoords = Lists.newArrayList(SPLITTER.split(dimDim.getValue(dimVal))); + Comparable fullDimVal = dimDim.getValue(dimVal); + // TODO: decide what to do for non-String spatial dims, skip for now + if (!(fullDimVal instanceof String)) { + return false; + } + List stringCoords = Lists.newArrayList(SPLITTER.split((String) fullDimVal)); float[] coords = new float[stringCoords.size()]; for (int j = 0; j < coords.length; j++) { coords[j] = Float.valueOf(stringCoords.get(j)); diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 67252400dd5f..5f4f58e0e182 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -33,6 +33,7 @@ import io.druid.segment.FloatColumnSelector; import io.druid.segment.LongColumnSelector; import io.druid.segment.ObjectColumnSelector; +import io.druid.segment.column.ValueType; import java.util.List; import java.util.Map; @@ -289,13 +290,13 @@ public void close() } } - static class OnHeapDimDim implements DimDim + static class OnHeapDimDim> implements DimDim { - private final Map valueToId = Maps.newHashMap(); - private String minValue = null; - private String maxValue = null; + private final Map valueToId = Maps.newHashMap(); + private T minValue = null; + private T maxValue = null; - private final List idToValue = Lists.newArrayList(); + private final List idToValue = Lists.newArrayList(); private final Object lock; public OnHeapDimDim(Object lock) @@ -303,7 +304,7 @@ public OnHeapDimDim(Object lock) this.lock = lock; } - public int getId(String value) + public int getId(T value) { synchronized (lock) { final Integer id = valueToId.get(value); @@ -311,14 +312,14 @@ public int getId(String value) } } - public String getValue(int id) + public T getValue(int id) { synchronized (lock) { return idToValue.get(id); } } - public boolean contains(String value) + public boolean contains(T value) { synchronized (lock) { return valueToId.containsKey(value); @@ -332,7 +333,7 @@ public int size() } } - public int add(String value) + public int add(T value) { synchronized (lock) { Integer prev = valueToId.get(value); @@ -349,13 +350,13 @@ public int add(String value) } @Override - public String getMinValue() + public T getMinValue() { return minValue; } @Override - public String getMaxValue() + public T getMaxValue() { return maxValue; } @@ -368,19 +369,19 @@ public OnHeapDimLookup sort() } } - static class OnHeapDimLookup implements SortedDimLookup + static class OnHeapDimLookup> implements SortedDimLookup { - private final String[] sortedVals; + private final List sortedVals; private final int[] idToIndex; private final int[] indexToId; - public OnHeapDimLookup(List idToValue, int length) + public OnHeapDimLookup(List idToValue, int length) { - Map sortedMap = Maps.newTreeMap(); + Map sortedMap = Maps.newTreeMap(); for (int id = 0; id < length; id++) { sortedMap.put(idToValue.get(id), id); } - this.sortedVals = sortedMap.keySet().toArray(new String[length]); + this.sortedVals = Lists.newArrayList(sortedMap.keySet()); this.idToIndex = new int[length]; this.indexToId = new int[length]; int index = 0; @@ -394,23 +395,23 @@ public OnHeapDimLookup(List idToValue, int length) @Override public int size() { - return sortedVals.length; + return sortedVals.size(); } @Override - public int indexToId(int index) + public int getUnsortedIdFromSortedId(int index) { return indexToId[index]; } @Override - public String getValue(int index) + public T getValueFromSortedId(int index) { - return sortedVals[index]; + return sortedVals.get(index); } @Override - public int idToIndex(int id) + public int getSortedIdFromUnsortedId(int id) { return idToIndex[id]; } diff --git a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowTransformer.java b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowTransformer.java index 9576a032da44..fc03a738a8dc 100644 --- a/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowTransformer.java +++ b/processing/src/main/java/io/druid/segment/incremental/SpatialDimensionRowTransformer.java @@ -126,7 +126,8 @@ public List getDimension(String dimension) @Override public Object getRaw(String dimension) { - return row.getRaw(dimension); + List retVal = spatialLookup.get(dimension); + return (retVal == null) ? row.getRaw(dimension) : retVal; } @Override diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index e528c2571c9f..43dcf5e90416 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -656,7 +656,7 @@ public long getLongMetric(String metric) @Override public Object getRaw(String dimension) { - return null; + return dimVals; } @Override