From d1b5431c2721ea99e4748a5b736c1d5b87327f04 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Tue, 30 Jan 2018 16:30:43 -0800 Subject: [PATCH 1/5] Discard rows with unparseable numeric dimensions --- .../io/druid/indexer/IndexGeneratorJob.java | 53 ++++- .../java/io/druid/indexer/InputRowSerde.java | 212 +++++++++++++++++- .../indexer/IndexGeneratorCombinerTest.java | 32 ++- .../io/druid/indexer/InputRowSerdeTest.java | 111 ++++++++- ...bleValueMatcherColumnSelectorStrategy.java | 2 +- ...oatValueMatcherColumnSelectorStrategy.java | 2 +- ...ongValueMatcherColumnSelectorStrategy.java | 2 +- .../epinephelinae/GroupByQueryEngineV2.java | 6 +- .../epinephelinae/RowBasedGrouperHelper.java | 6 +- .../java/io/druid/query/topn/TopNMapFn.java | 6 +- .../druid/segment/DimensionHandlerUtils.java | 27 ++- .../druid/segment/DoubleDimensionIndexer.java | 2 +- .../druid/segment/FloatDimensionIndexer.java | 2 +- .../druid/segment/LongDimensionIndexer.java | 2 +- .../incremental/IncrementalIndexTest.java | 58 +++-- .../io/druid/sql/calcite/rel/QueryMaker.java | 6 +- 16 files changed, 457 insertions(+), 72 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index c9111d00cd35..ca4731265a66 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -64,6 +64,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Partitioner; @@ -210,9 +211,17 @@ public boolean run() boolean success = job.waitForCompletion(true); - Counter invalidRowCount = job.getCounters() - .findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER); - jobStats.setInvalidRowCount(invalidRowCount.getValue()); + Counters counters = job.getCounters(); + if (counters == null) { + log.info("No counters found for job [%s]", job.getJobName()); + } else { + Counter invalidRowCount = counters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER); + if (invalidRowCount != null) { + jobStats.setInvalidRowCount(invalidRowCount.getValue()); + } else { + log.info("No invalid row counter found for job [%s]", job.getJobName()); + } + } return success; } @@ -252,12 +261,15 @@ private static IncrementalIndex makeIncrementalIndex( return newIndex; } + + public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); private AggregatorFactory[] aggregators; private AggregatorFactory[] combiningAggs; + private Map typeHelperMap; @Override protected void setup(Context context) @@ -269,6 +281,11 @@ protected void setup(Context context) for (int i = 0; i < aggregators.length; ++i) { combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -299,9 +316,13 @@ protected void innerMap( // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw // data byte[] serializedInputRow = inputRow instanceof SegmentInputRow ? - InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions) + InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, reportParseExceptions) : - InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions); + InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions); + + if (serializedInputRow == null) { + return; + } context.write( new SortableBytes( @@ -322,6 +343,8 @@ public static class IndexGeneratorCombiner extends Reducer typeHelperMap; + @Override protected void setup(Context context) @@ -334,6 +357,11 @@ protected void setup(Context context) for (int i = 0; i < aggregators.length; ++i) { combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -381,11 +409,16 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind context.progress(); Row row = rows.next(); InputRow inputRow = getInputRowFromRow(row, dimensions); - // reportParseExceptions is true as any unparseable data is already handled by the mapper. - context.write( - key, - new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true)) - ); + + byte[] serializedRow = InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, true); + + if (serializedRow != null) { + // reportParseExceptions is true as any unparseable data is already handled by the mapper. + context.write( + key, + new BytesWritable(serializedRow) + ); + } } index.close(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index 672f77133746..e2f3faeccc7e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -22,17 +22,25 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.Rows; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.VirtualColumns; +import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; @@ -49,7 +57,166 @@ public class InputRowSerde { private static final Logger log = new Logger(InputRowSerde.class); - public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions) + private static final IndexSerdeTypeHelper STRING_HELPER = new StringIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper LONG_HELPER = new LongIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper(); + + public interface IndexSerdeTypeHelper + { + ValueType getType(); + + byte[] serialize(Object value); + + T deserialize(byte[] bytes); + } + + private static final IndexSerdeTypeHelper[] VALUE_TYPE_HELPER_ARRAY = + new IndexSerdeTypeHelper[ValueType.values().length]; + static { + VALUE_TYPE_HELPER_ARRAY[ValueType.STRING.ordinal()] = STRING_HELPER; + VALUE_TYPE_HELPER_ARRAY[ValueType.LONG.ordinal()] = LONG_HELPER; + VALUE_TYPE_HELPER_ARRAY[ValueType.FLOAT.ordinal()] = FLOAT_HELPER; + VALUE_TYPE_HELPER_ARRAY[ValueType.DOUBLE.ordinal()] = DOUBLE_HELPER; + } + + public static Map getTypeHelperMap(DimensionsSpec dimensionsSpec) + { + Map typeHelperMap = Maps.newHashMap(); + for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) { + IndexSerdeTypeHelper typeHelper; + switch (dimensionSchema.getValueType()) { + case STRING: + typeHelper = STRING_HELPER; + break; + case LONG: + typeHelper = LONG_HELPER; + break; + case FLOAT: + typeHelper = FLOAT_HELPER; + break; + case DOUBLE: + typeHelper = DOUBLE_HELPER; + break; + default: + throw new IAE("Invalid type: [%s]", dimensionSchema.getValueType()); + } + typeHelperMap.put(dimensionSchema.getName(), typeHelper); + } + return typeHelperMap; + } + + public static class StringIndexSerdeTypeHelper implements IndexSerdeTypeHelper> + { + @Override + public ValueType getType() + { + return ValueType.STRING; + } + + @Override + public byte[] serialize(Object value) + { + List values = Rows.objectToStrings(value); + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + try { + writeStringArray(values, out); + return out.toByteArray(); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public List deserialize(byte[] bytes) + { + ByteArrayDataInput input = ByteStreams.newDataInput(bytes); + try { + return readStringArray(input); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } + + public static class LongIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.LONG; + } + + @Override + public byte[] serialize(Object value) + { + return Longs.toByteArray( + DimensionHandlerUtils.convertObjectToLong(value, true) + ); + } + + @Override + public Long deserialize(byte[] bytes) + { + return Longs.fromByteArray(bytes); + } + } + + public static class FloatIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.FLOAT; + } + + @Override + public byte[] serialize(Object value) + { + return Ints.toByteArray( + Float.floatToIntBits(DimensionHandlerUtils.convertObjectToFloat(value, true)) + ); + } + + @Override + public Float deserialize(byte[] bytes) + { + return Float.intBitsToFloat(Ints.fromByteArray(bytes)); + } + } + + public static class DoubleIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.DOUBLE; + } + + + @Override + public byte[] serialize(Object value) + { + return Longs.toByteArray( + Double.doubleToLongBits(DimensionHandlerUtils.convertObjectToDouble(value, true)) + ); + } + + @Override + public Double deserialize(byte[] bytes) + { + return Double.longBitsToDouble(Longs.fromByteArray(bytes)); + } + } + + public static final byte[] toBytes( + final Map typeHelperMap, + final InputRow row, + AggregatorFactory[] aggs, + boolean reportParseExceptions + ) { try { ByteArrayDataOutput out = ByteStreams.newDataOutput(); @@ -63,9 +230,25 @@ public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, WritableUtils.writeVInt(out, dimList.size()); if (dimList != null) { for (String dim : dimList) { - List dimValues = row.getDimension(dim); - writeString(dim, out); - writeStringArray(dimValues, out); + IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dim); + if (typeHelper == null) { + typeHelper = STRING_HELPER; + } + try { + byte[] value = typeHelper.serialize(row.getRaw(dim)); + + writeString(dim, out); + WritableUtils.writeVInt(out, typeHelper.getType().ordinal()); + writeBytes(value, out); + } + catch (ParseException pe) { + if (reportParseExceptions) { + throw pe; + } else { + // discard the row if there was a parse error in a dimension + return null; + } + } } } @@ -192,14 +375,25 @@ public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) for (int i = 0; i < dimNum; i++) { String dimension = readString(in); dimensions.add(dimension); - List dimensionValues = readStringArray(in); - if (dimensionValues == null) { + + int type = WritableUtils.readVInt(in); + byte[] value = readBytes(in); + IndexSerdeTypeHelper typeHelper = VALUE_TYPE_HELPER_ARRAY[type]; + Object dimValues = typeHelper.deserialize(value); + + if (dimValues == null) { continue; } - if (dimensionValues.size() == 1) { - event.put(dimension, dimensionValues.get(0)); + + if (type == ValueType.STRING.ordinal()) { + List dimensionValues = (List) dimValues; + if (dimensionValues.size() == 1) { + event.put(dimension, dimensionValues.get(0)); + } else { + event.put(dimension, dimensionValues); + } } else { - event.put(dimension, dimensionValues); + event.put(dimension, dimValues); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index 9eaf75349ed9..207dd52eafa3 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -25,6 +25,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; @@ -144,6 +145,17 @@ public void testMultipleRowsMerged() throws Exception ); BytesWritable key = keySortableBytes.toBytesWritable(); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("host"), + new StringDimensionSchema("keywords") + ), + null, + null + ); + + Map typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec); + InputRow row1 = new MapBasedInputRow( timestamp, ImmutableList.of("keywords"), @@ -163,8 +175,8 @@ public void testMultipleRowsMerged() throws Exception ) ); List rows = Lists.newArrayList( - new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)), - new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true)) + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators, true)), + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators, true)) ); Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); @@ -228,9 +240,21 @@ public void testMultipleRowsNotMerged() throws Exception "visited", 5 ) ); + + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("host"), + new StringDimensionSchema("keywords") + ), + null, + null + ); + + Map typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec); + List rows = Lists.newArrayList( - new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)), - new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true)) + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators, true)), + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators, true)) ); Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java index 67a9b5abf8a3..2f676aac9bdf 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java @@ -20,9 +20,14 @@ package io.druid.indexer; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.DoubleDimensionSchema; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.hll.HyperLogLogCollector; import io.druid.jackson.AggregatorsModule; import io.druid.java.util.common.parsers.ParseException; @@ -35,8 +40,11 @@ import io.druid.segment.ColumnSelectorFactory; import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -53,17 +61,22 @@ public class InputRowSerdeTest new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public InputRowSerdeTest() { this.timestamp = System.currentTimeMillis(); - this.dims = ImmutableList.of("dim_non_existing", "d1", "d2"); - this.event = ImmutableMap.of( - "d1", "d1v", - "d2", ImmutableList.of("d2v1", "d2v2"), - "m1", 5.0f, - "m2", 100L, - "m3", "m3v" - ); + this.dims = ImmutableList.of("dim_non_existing", "d1", "d2", "d3", "d4", "d5"); + this.event = Maps.newHashMap(); + event.put("d1", "d1v"); + event.put("d2", ImmutableList.of("d2v1", "d2v2")); + event.put("d3", 200L); + event.put("d4", 300.1f); + event.put("d5", 400.5d); + event.put("m1", 5.0f); + event.put("m2", 100L); + event.put("m3", "m3v"); } @Test @@ -99,7 +112,19 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) } }; - byte[] data = InputRowSerde.toBytes(in, aggregatorFactories, false); // Ignore Unparseable aggregator + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + + byte[] data = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, false); // Ignore Unparseable aggregator InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories); Assert.assertEquals(timestamp, out.getTimestampFromEpoch()); @@ -107,6 +132,9 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) Assert.assertEquals(Collections.EMPTY_LIST, out.getDimension("dim_non_existing")); Assert.assertEquals(ImmutableList.of("d1v"), out.getDimension("d1")); Assert.assertEquals(ImmutableList.of("d2v1", "d2v2"), out.getDimension("d2")); + Assert.assertEquals(200L, out.getRaw("d3")); + Assert.assertEquals(300.1f, out.getRaw("d4")); + Assert.assertEquals(400.5d, out.getRaw("d5")); Assert.assertEquals(0.0f, out.getMetric("agg_non_existing").floatValue(), 0.00001); Assert.assertEquals(5.0f, out.getMetric("m1out").floatValue(), 0.00001); @@ -117,7 +145,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) EasyMock.verify(mockedAggregator); } - @Test(expected = ParseException.class) + @Test public void testThrowParseExceptions() { InputRow in = new MapBasedInputRow( @@ -133,7 +161,66 @@ public void testThrowParseExceptions() new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long }; - InputRowSerde.toBytes(in, aggregatorFactories, true); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + expectedException.expect(ParseException.class); + expectedException.expectMessage("Encountered parse error for aggregator[unparseable]"); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + } + + @Test + public void testDimensionParseExceptions() + { + InputRow in = new MapBasedInputRow( + timestamp, + dims, + event + ); + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new LongSumAggregatorFactory("m2out", "m2") + }; + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to long"); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new LongDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to float"); + dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new FloatDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to double"); + dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new DoubleDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); } } diff --git a/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java index f41e6c6a0242..92914d67fa1e 100644 --- a/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public class DoubleValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseDoubleColumnValueSelector selector, final String value) { - final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value); + final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value, false); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java index af7a2b215ed9..7a74962d41cc 100644 --- a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java @@ -30,7 +30,7 @@ public class FloatValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseFloatColumnValueSelector selector, final String value) { - final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value); + final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value, false); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java index 15a62bd50631..cffd8ffa54f8 100644 --- a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java @@ -30,7 +30,7 @@ public class LongValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseLongColumnValueSelector selector, final String value) { - final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value); + final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value, false); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 6db22e1efb19..1f2b9ce1e34c 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -688,15 +688,15 @@ private static void convertRowTypesToOutputTypes(List dimensionSp baseVal = baseVal == null ? "" : baseVal.toString(); break; case LONG: - baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal, false); baseVal = baseVal == null ? 0L : baseVal; break; case FLOAT: - baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal, false); baseVal = baseVal == null ? 0.f : baseVal; break; case DOUBLE: - baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal); + baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal, false); baseVal = baseVal == null ? 0.d : baseVal; break; default: diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 290cf13753bf..7051460f7387 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -613,21 +613,21 @@ private static Function[] makeValueConvertFunctions( case LONG: functions[i] = input -> { - final Long val = DimensionHandlerUtils.convertObjectToLong(input); + final Long val = DimensionHandlerUtils.convertObjectToLong(input, false); return val == null ? 0L : val; }; break; case FLOAT: functions[i] = input -> { - final Float val = DimensionHandlerUtils.convertObjectToFloat(input); + final Float val = DimensionHandlerUtils.convertObjectToFloat(input, false); return val == null ? 0.f : val; }; break; case DOUBLE: functions[i] = input -> { - Double val = DimensionHandlerUtils.convertObjectToDouble(input); + Double val = DimensionHandlerUtils.convertObjectToDouble(input, false); return val == null ? 0.0 : val; }; break; diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index 24d05fa37775..e8625395f167 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -52,16 +52,16 @@ public static Function getValueTransformer(ValueType outputType) private static Function STRING_TRANSFORMER = input -> Objects.toString(input, null); private static Function LONG_TRANSFORMER = input -> { - final Long longVal = DimensionHandlerUtils.convertObjectToLong(input); + final Long longVal = DimensionHandlerUtils.convertObjectToLong(input, false); return longVal == null ? DimensionHandlerUtils.ZERO_LONG : longVal; }; private static Function FLOAT_TRANSFORMER = input -> { - final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input); + final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input, false); return floatVal == null ? DimensionHandlerUtils.ZERO_FLOAT : floatVal; }; private static Function DOUBLE_TRANSFORMER = input -> { - final Double doubleValue = DimensionHandlerUtils.convertObjectToDouble(input); + final Double doubleValue = DimensionHandlerUtils.convertObjectToDouble(input, false); return doubleValue == null ? DimensionHandlerUtils.ZERO_DOUBLE : doubleValue; }; diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 504e46c4f97f..1c44fccc864d 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -238,7 +238,7 @@ private static Colu } @Nullable - public static Long convertObjectToLong(@Nullable Object valObj) + public static Long convertObjectToLong(@Nullable Object valObj, boolean throwOnUnparseableString) { if (valObj == null) { return ZERO_LONG; @@ -249,14 +249,19 @@ public static Long convertObjectToLong(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).longValue(); } else if (valObj instanceof String) { - return DimensionHandlerUtils.getExactLongFromDecimalString((String) valObj); + Long ret = DimensionHandlerUtils.getExactLongFromDecimalString((String) valObj); + // reject unparseable strings during indexing, treat them as nulls in query paths + if (throwOnUnparseableString && ret == null) { + throw new ParseException("could not convert value [%s] to long", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } } @Nullable - public static Float convertObjectToFloat(@Nullable Object valObj) + public static Float convertObjectToFloat(@Nullable Object valObj, boolean throwOnUnparseableString) { if (valObj == null) { return ZERO_FLOAT; @@ -267,14 +272,19 @@ public static Float convertObjectToFloat(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).floatValue(); } else if (valObj instanceof String) { - return Floats.tryParse((String) valObj); + Float ret = Floats.tryParse((String) valObj); + // reject unparseable strings during indexing, treat them as nulls in query paths + if (throwOnUnparseableString && ret == null) { + throw new ParseException("could not convert value [%s] to float", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } } @Nullable - public static Double convertObjectToDouble(@Nullable Object valObj) + public static Double convertObjectToDouble(@Nullable Object valObj, boolean throwOnUnparseableString) { if (valObj == null) { return ZERO_DOUBLE; @@ -285,7 +295,12 @@ public static Double convertObjectToDouble(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).doubleValue(); } else if (valObj instanceof String) { - return Doubles.tryParse((String) valObj); + Double ret = Doubles.tryParse((String) valObj); + // reject unparseable strings during indexing, treat them as nulls in query paths + if (throwOnUnparseableString && ret == null) { + throw new ParseException("could not convert value [%s] to double", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index 26d71abc2933..cdbe4660901a 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -39,7 +39,7 @@ public Double processRowValsToUnsortedEncodedKeyComponent(Object dimValues) if (dimValues instanceof List) { throw new UnsupportedOperationException("Numeric columns do not support multivalue rows."); } - return DimensionHandlerUtils.convertObjectToDouble(dimValues); + return DimensionHandlerUtils.convertObjectToDouble(dimValues, true); } @Override diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index 49c93f576107..3e8485ec308c 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -40,7 +40,7 @@ public Float processRowValsToUnsortedEncodedKeyComponent(Object dimValues) throw new UnsupportedOperationException("Numeric columns do not support multivalue rows."); } - return DimensionHandlerUtils.convertObjectToFloat(dimValues); + return DimensionHandlerUtils.convertObjectToFloat(dimValues, true); } @Override diff --git a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java index 8aeeca25395d..2256aa91dcd2 100644 --- a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java @@ -40,7 +40,7 @@ public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues) throw new UnsupportedOperationException("Numeric columns do not support multivalue rows."); } - return DimensionHandlerUtils.convertObjectToLong(dimValues); + return DimensionHandlerUtils.convertObjectToLong(dimValues, true); } @Override diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index ee728cb51605..ca6ffe9fb66f 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -25,13 +25,15 @@ import com.google.common.collect.Lists; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; -import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DoubleDimensionSchema; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.java.util.common.ISE; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; @@ -40,6 +42,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -60,6 +63,9 @@ interface IndexCreator IncrementalIndex createIndex(); } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public final CloserRule closer = new CloserRule(false); @@ -76,8 +82,8 @@ public static Collection constructorFeeder() throws IOException DimensionsSpec dimensions = new DimensionsSpec( Arrays.asList( new StringDimensionSchema("string"), - new StringDimensionSchema("float"), - new StringDimensionSchema("long"), + new FloatDimensionSchema("float"), + new LongDimensionSchema("long"), new DoubleDimensionSchema("double") ), null, null ); @@ -206,28 +212,54 @@ public void controlTest() throws IndexSizeExceededException } @Test - public void testNullDimensionTransform() throws IndexSizeExceededException + public void testUnparseableNumerics() throws IndexSizeExceededException { IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [asdj] to long"); index.add( new MapBasedInputRow( System.currentTimeMillis() - 1, Lists.newArrayList("string", "float", "long", "double"), ImmutableMap.of( - "string", Arrays.asList("A", null, ""), - "float", Arrays.asList(Float.POSITIVE_INFINITY, null, ""), - "long", Arrays.asList(Long.MIN_VALUE, null, ""), - "double", "" + "string", "A", + "float", "19.0", + "long", "asdj", + "double", 21.0d ) ) ); - Row row = index.iterator().next(); + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [aaa] to float"); + index.add( + new MapBasedInputRow( + System.currentTimeMillis() - 1, + Lists.newArrayList("string", "float", "long", "double"), + ImmutableMap.of( + "string", "A", + "float", "aaa", + "long", 20, + "double", 21.0d + ) + ) + ); - Assert.assertEquals(Arrays.asList(new String[]{"", "", "A"}), row.getRaw("string")); - Assert.assertEquals(Arrays.asList(new String[]{"", "", String.valueOf(Float.POSITIVE_INFINITY)}), row.getRaw("float")); - Assert.assertEquals(Arrays.asList(new String[]{"", "", String.valueOf(Long.MIN_VALUE)}), row.getRaw("long")); - Assert.assertEquals(0.0, row.getMetric("double").doubleValue(), 0.0); + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [] to double"); + index.add( + new MapBasedInputRow( + System.currentTimeMillis() - 1, + Lists.newArrayList("string", "float", "long", "double"), + ImmutableMap.of( + "string", "A", + "float", 19.0, + "long", 20, + "double", "" + ) + ) + ); } @Test diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index ed006a8b95ab..6adefc64f17a 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -430,21 +430,21 @@ private Object coerce(final Object value, final SqlTypeName sqlType) } } else if (sqlType == SqlTypeName.BIGINT) { try { - coercedValue = DimensionHandlerUtils.convertObjectToLong(value); + coercedValue = DimensionHandlerUtils.convertObjectToLong(value, false); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } } else if (sqlType == SqlTypeName.FLOAT) { try { - coercedValue = DimensionHandlerUtils.convertObjectToFloat(value); + coercedValue = DimensionHandlerUtils.convertObjectToFloat(value, false); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } } else if (SqlTypeName.FRACTIONAL_TYPES.contains(sqlType)) { try { - coercedValue = DimensionHandlerUtils.convertObjectToDouble(value); + coercedValue = DimensionHandlerUtils.convertObjectToDouble(value, false); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); From f4cb2c574af11985cdd238aaca02d38e6db0454e Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 31 Jan 2018 19:02:32 -0800 Subject: [PATCH 2/5] PR comments --- .../io/druid/indexer/IndexGeneratorJob.java | 19 +++-- .../java/io/druid/indexer/InputRowSerde.java | 81 +++++++------------ .../indexer/IndexGeneratorCombinerTest.java | 6 +- .../io/druid/indexer/InputRowSerdeTest.java | 2 +- 4 files changed, 47 insertions(+), 61 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index ca4731265a66..e663c71b5787 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -261,8 +261,6 @@ private static IncrementalIndex makeIncrementalIndex( return newIndex; } - - public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper { private static final HashFunction hashFunction = Hashing.murmur3_128(); @@ -321,6 +319,8 @@ protected void innerMap( InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions); if (serializedInputRow == null) { + log.debug("Ignoring invalid row [%s] due to parsing error", inputRow); + context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); return; } @@ -345,7 +345,6 @@ public static class IndexGeneratorCombiner extends Reducer typeHelperMap; - @Override protected void setup(Context context) throws IOException, InterruptedException @@ -378,11 +377,11 @@ protected void reduce( SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null); - index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); + index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators)); while (iter.hasNext()) { context.progress(); - InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); + InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators); if (!index.canAppendRow()) { dimOrder.addAll(index.getDimensionOrder()); @@ -410,10 +409,10 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind Row row = rows.next(); InputRow inputRow = getInputRowFromRow(row, dimensions); + // reportParseExceptions is true as any unparseable data is already handled by the mapper. byte[] serializedRow = InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, true); if (serializedRow != null) { - // reportParseExceptions is true as any unparseable data is already handled by the mapper. context.write( key, new BytesWritable(serializedRow) @@ -512,6 +511,7 @@ public static class IndexGeneratorReducer extends Reducer metricNames = Lists.newArrayList(); private AggregatorFactory[] aggregators; private AggregatorFactory[] combiningAggs; + private Map typeHelperMap; protected ProgressIndicator makeProgressIndicator(final Context context) { @@ -563,6 +563,11 @@ protected void setup(Context context) metricNames.add(aggregators[i].getName()); combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -630,7 +635,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) for (final BytesWritable bw : values) { context.progress(); - final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); + final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(typeHelperMap, bw.getBytes(), aggregators)); int numRows = index.add(inputRow); ++lineCount; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index e2f3faeccc7e..9e5035d485b6 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -25,8 +25,7 @@ import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; -import com.google.common.primitives.Ints; -import com.google.common.primitives.Longs; + import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.Rows; @@ -66,18 +65,9 @@ public interface IndexSerdeTypeHelper { ValueType getType(); - byte[] serialize(Object value); - - T deserialize(byte[] bytes); - } + void serialize(ByteArrayDataOutput out, Object value); - private static final IndexSerdeTypeHelper[] VALUE_TYPE_HELPER_ARRAY = - new IndexSerdeTypeHelper[ValueType.values().length]; - static { - VALUE_TYPE_HELPER_ARRAY[ValueType.STRING.ordinal()] = STRING_HELPER; - VALUE_TYPE_HELPER_ARRAY[ValueType.LONG.ordinal()] = LONG_HELPER; - VALUE_TYPE_HELPER_ARRAY[ValueType.FLOAT.ordinal()] = FLOAT_HELPER; - VALUE_TYPE_HELPER_ARRAY[ValueType.DOUBLE.ordinal()] = DOUBLE_HELPER; + T deserialize(ByteArrayDataInput in); } public static Map getTypeHelperMap(DimensionsSpec dimensionsSpec) @@ -115,13 +105,11 @@ public ValueType getType() } @Override - public byte[] serialize(Object value) + public void serialize(ByteArrayDataOutput out, Object value) { List values = Rows.objectToStrings(value); - ByteArrayDataOutput out = ByteStreams.newDataOutput(); try { writeStringArray(values, out); - return out.toByteArray(); } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -129,11 +117,10 @@ public byte[] serialize(Object value) } @Override - public List deserialize(byte[] bytes) + public List deserialize(ByteArrayDataInput in) { - ByteArrayDataInput input = ByteStreams.newDataInput(bytes); try { - return readStringArray(input); + return readStringArray(in); } catch (IOException ioe) { throw new RuntimeException(ioe); @@ -150,17 +137,15 @@ public ValueType getType() } @Override - public byte[] serialize(Object value) + public void serialize(ByteArrayDataOutput out, Object value) { - return Longs.toByteArray( - DimensionHandlerUtils.convertObjectToLong(value, true) - ); + out.writeLong(DimensionHandlerUtils.convertObjectToLong(value, true)); } @Override - public Long deserialize(byte[] bytes) + public Long deserialize(ByteArrayDataInput in) { - return Longs.fromByteArray(bytes); + return in.readLong(); } } @@ -173,17 +158,15 @@ public ValueType getType() } @Override - public byte[] serialize(Object value) + public void serialize(ByteArrayDataOutput out, Object value) { - return Ints.toByteArray( - Float.floatToIntBits(DimensionHandlerUtils.convertObjectToFloat(value, true)) - ); + out.writeFloat(DimensionHandlerUtils.convertObjectToFloat(value, true)); } @Override - public Float deserialize(byte[] bytes) + public Float deserialize(ByteArrayDataInput in) { - return Float.intBitsToFloat(Ints.fromByteArray(bytes)); + return in.readFloat(); } } @@ -195,19 +178,16 @@ public ValueType getType() return ValueType.DOUBLE; } - @Override - public byte[] serialize(Object value) + public void serialize(ByteArrayDataOutput out, Object value) { - return Longs.toByteArray( - Double.doubleToLongBits(DimensionHandlerUtils.convertObjectToDouble(value, true)) - ); + out.writeDouble(DimensionHandlerUtils.convertObjectToDouble(value, true)); } @Override - public Double deserialize(byte[] bytes) + public Double deserialize(ByteArrayDataInput in) { - return Double.longBitsToDouble(Longs.fromByteArray(bytes)); + return in.readDouble(); } } @@ -235,11 +215,8 @@ public static final byte[] toBytes( typeHelper = STRING_HELPER; } try { - byte[] value = typeHelper.serialize(row.getRaw(dim)); - writeString(dim, out); - WritableUtils.writeVInt(out, typeHelper.getType().ordinal()); - writeBytes(value, out); + typeHelper.serialize(out, row.getRaw(dim)); } catch (ParseException pe) { if (reportParseExceptions) { @@ -359,10 +336,14 @@ private static List readStringArray(DataInput in) throws IOException return values; } - public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) + public static final InputRow fromBytes( + final Map typeHelperMap, + byte[] data, + AggregatorFactory[] aggs + ) { try { - DataInput in = ByteStreams.newDataInput(data); + ByteArrayDataInput in = ByteStreams.newDataInput(data); //Read timestamp long timestamp = in.readLong(); @@ -376,16 +357,16 @@ public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) String dimension = readString(in); dimensions.add(dimension); - int type = WritableUtils.readVInt(in); - byte[] value = readBytes(in); - IndexSerdeTypeHelper typeHelper = VALUE_TYPE_HELPER_ARRAY[type]; - Object dimValues = typeHelper.deserialize(value); - + IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dimension); + if (typeHelper == null) { + typeHelper = STRING_HELPER; + } + Object dimValues = typeHelper.deserialize(in); if (dimValues == null) { continue; } - if (type == ValueType.STRING.ordinal()) { + if (typeHelper.getType() == ValueType.STRING) { List dimensionValues = (List) dimValues; if (dimensionValues.size() == 1) { event.put(dimension, dimensionValues.get(0)); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index 207dd52eafa3..9eb75e27b779 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -195,7 +195,7 @@ public void testMultipleRowsMerged() throws Exception Assert.assertTrue(captureKey.getValue() == key); - InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators); + InputRow capturedRow = InputRowSerde.fromBytes(typeHelperMap, captureVal.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow.getDimensions()); Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords")); @@ -277,7 +277,7 @@ public void testMultipleRowsNotMerged() throws Exception Assert.assertTrue(captureKey1.getValue() == key); Assert.assertTrue(captureKey2.getValue() == key); - InputRow capturedRow1 = InputRowSerde.fromBytes(captureVal1.getValue().getBytes(), aggregators); + InputRow capturedRow1 = InputRowSerde.fromBytes(typeHelperMap, captureVal1.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow1.getDimensions()); Assert.assertEquals(Collections.singletonList("host1"), capturedRow1.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords")); @@ -288,7 +288,7 @@ public void testMultipleRowsNotMerged() throws Exception 0.001 ); - InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators); + InputRow capturedRow2 = InputRowSerde.fromBytes(typeHelperMap, captureVal2.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions()); Assert.assertEquals(Collections.singletonList("host2"), capturedRow2.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords")); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java index 2f676aac9bdf..71609e42dd32 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java @@ -125,7 +125,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) ); byte[] data = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, false); // Ignore Unparseable aggregator - InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories); + InputRow out = InputRowSerde.fromBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), data, aggregatorFactories); Assert.assertEquals(timestamp, out.getTimestampFromEpoch()); Assert.assertEquals(dims, out.getDimensions()); From d3f0cabb27e980c338a739d92da736e13aebd3a0 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 1 Feb 2018 13:15:33 -0800 Subject: [PATCH 3/5] Don't throw away entire row on parse exception --- .../io/druid/indexer/IndexGeneratorJob.java | 16 ++----- .../java/io/druid/indexer/InputRowSerde.java | 48 +++++++++++-------- .../druid/segment/DimensionHandlerUtils.java | 15 +++--- .../io/druid/segment/DimensionIndexer.java | 3 +- .../druid/segment/DoubleDimensionIndexer.java | 6 ++- .../druid/segment/FloatDimensionIndexer.java | 6 ++- .../druid/segment/LongDimensionIndexer.java | 6 ++- .../druid/segment/StringDimensionIndexer.java | 2 +- .../segment/incremental/IncrementalIndex.java | 5 +- .../incremental/IncrementalIndexAdapter.java | 2 +- 10 files changed, 58 insertions(+), 51 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index e663c71b5787..a9726bb2db16 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -318,12 +318,6 @@ protected void innerMap( : InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions); - if (serializedInputRow == null) { - log.debug("Ignoring invalid row [%s] due to parsing error", inputRow); - context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER).increment(1); - return; - } - context.write( new SortableBytes( bucket.get().toGroupKey(), @@ -412,12 +406,10 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind // reportParseExceptions is true as any unparseable data is already handled by the mapper. byte[] serializedRow = InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, true); - if (serializedRow != null) { - context.write( - key, - new BytesWritable(serializedRow) - ); - } + context.write( + key, + new BytesWritable(serializedRow) + ); } index.close(); } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index 9e5035d485b6..cd1dd531604a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -65,7 +65,7 @@ public interface IndexSerdeTypeHelper { ValueType getType(); - void serialize(ByteArrayDataOutput out, Object value); + void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions); T deserialize(ByteArrayDataInput in); } @@ -105,7 +105,7 @@ public ValueType getType() } @Override - public void serialize(ByteArrayDataOutput out, Object value) + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) { List values = Rows.objectToStrings(value); try { @@ -137,9 +137,15 @@ public ValueType getType() } @Override - public void serialize(ByteArrayDataOutput out, Object value) + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) { - out.writeLong(DimensionHandlerUtils.convertObjectToLong(value, true)); + Long ret = DimensionHandlerUtils.convertObjectToLong(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_LONG; + } + out.writeLong(ret); } @Override @@ -158,9 +164,15 @@ public ValueType getType() } @Override - public void serialize(ByteArrayDataOutput out, Object value) + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) { - out.writeFloat(DimensionHandlerUtils.convertObjectToFloat(value, true)); + Float ret = DimensionHandlerUtils.convertObjectToFloat(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_FLOAT; + } + out.writeFloat(ret); } @Override @@ -179,9 +191,15 @@ public ValueType getType() } @Override - public void serialize(ByteArrayDataOutput out, Object value) + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) { - out.writeDouble(DimensionHandlerUtils.convertObjectToDouble(value, true)); + Double ret = DimensionHandlerUtils.convertObjectToDouble(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_DOUBLE; + } + out.writeDouble(ret); } @Override @@ -214,18 +232,8 @@ public static final byte[] toBytes( if (typeHelper == null) { typeHelper = STRING_HELPER; } - try { - writeString(dim, out); - typeHelper.serialize(out, row.getRaw(dim)); - } - catch (ParseException pe) { - if (reportParseExceptions) { - throw pe; - } else { - // discard the row if there was a parse error in a dimension - return null; - } - } + writeString(dim, out); + typeHelper.serialize(out, row.getRaw(dim), reportParseExceptions); } } diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 1c44fccc864d..3fbf9c29bb19 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -238,7 +238,7 @@ private static Colu } @Nullable - public static Long convertObjectToLong(@Nullable Object valObj, boolean throwOnUnparseableString) + public static Long convertObjectToLong(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_LONG; @@ -250,8 +250,7 @@ public static Long convertObjectToLong(@Nullable Object valObj, boolean throwOnU return ((Number) valObj).longValue(); } else if (valObj instanceof String) { Long ret = DimensionHandlerUtils.getExactLongFromDecimalString((String) valObj); - // reject unparseable strings during indexing, treat them as nulls in query paths - if (throwOnUnparseableString && ret == null) { + if (reportParseExceptions && ret == null) { throw new ParseException("could not convert value [%s] to long", valObj); } return ret; @@ -261,7 +260,7 @@ public static Long convertObjectToLong(@Nullable Object valObj, boolean throwOnU } @Nullable - public static Float convertObjectToFloat(@Nullable Object valObj, boolean throwOnUnparseableString) + public static Float convertObjectToFloat(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_FLOAT; @@ -273,8 +272,7 @@ public static Float convertObjectToFloat(@Nullable Object valObj, boolean throwO return ((Number) valObj).floatValue(); } else if (valObj instanceof String) { Float ret = Floats.tryParse((String) valObj); - // reject unparseable strings during indexing, treat them as nulls in query paths - if (throwOnUnparseableString && ret == null) { + if (reportParseExceptions && ret == null) { throw new ParseException("could not convert value [%s] to float", valObj); } return ret; @@ -284,7 +282,7 @@ public static Float convertObjectToFloat(@Nullable Object valObj, boolean throwO } @Nullable - public static Double convertObjectToDouble(@Nullable Object valObj, boolean throwOnUnparseableString) + public static Double convertObjectToDouble(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_DOUBLE; @@ -296,8 +294,7 @@ public static Double convertObjectToDouble(@Nullable Object valObj, boolean thro return ((Number) valObj).doubleValue(); } else if (valObj instanceof String) { Double ret = Doubles.tryParse((String) valObj); - // reject unparseable strings during indexing, treat them as nulls in query paths - if (throwOnUnparseableString && ret == null) { + if (reportParseExceptions && ret == null) { throw new ParseException("could not convert value [%s] to double", valObj); } return ret; diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index 390c1992ea63..cbc298c2f246 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -122,9 +122,10 @@ public interface DimensionIndexer * * @param dimValues Single row val to process * + * @param reportParseExceptions * @return An array containing an encoded representation of the input row value. */ - EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues); + EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions); /** diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index cdbe4660901a..2643ed6ff386 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -34,12 +34,14 @@ public class DoubleDimensionIndexer implements DimensionIndexer zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_DOUBLE : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index 3e8485ec308c..6ac82d6abc36 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -34,13 +34,15 @@ public class FloatDimensionIndexer implements DimensionIndexer zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_FLOAT : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java index 2256aa91dcd2..29afdde65b9c 100644 --- a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java @@ -34,13 +34,15 @@ public class LongDimensionIndexer implements DimensionIndexer { @Override - public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues) + public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions) { if (dimValues instanceof List) { throw new UnsupportedOperationException("Numeric columns do not support multivalue rows."); } - return DimensionHandlerUtils.convertObjectToLong(dimValues, true); + Long ret = DimensionHandlerUtils.convertObjectToLong(dimValues, reportParseExceptions); + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_LONG : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index bbe89d603388..af994ecfd6e9 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -188,7 +188,7 @@ public StringDimensionIndexer(MultiValueHandling multiValueHandling) } @Override - public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues) + public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions) { final int[] encodedDimensionValues; final int oldDictSize = dimLookup.size(); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 09cb0cfd3dee..d2f19696129c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -554,7 +554,10 @@ TimeAndDims toTimeAndDims(InputRow row) } DimensionHandler handler = desc.getHandler(); DimensionIndexer indexer = desc.getIndexer(); - Object dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension)); + Object dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( + row.getRaw(dimension), + reportParseExceptions + ); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimsKey != null && handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 8cb44ffa669b..8eb148d6bb4a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -100,7 +100,7 @@ public IncrementalIndexAdapter( // Add 'null' to the dimension's dictionary. if (dimIndex >= dims.length || dims[dimIndex] == null) { - accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null); + accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null, true); continue; } final ColumnCapabilities capabilities = dimension.getCapabilities(); From 3991e255ec4cf8c86ac8c4d3833ad7d3fcc61ec8 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 2 Feb 2018 14:44:54 -0800 Subject: [PATCH 4/5] PR comments --- ...bleValueMatcherColumnSelectorStrategy.java | 2 +- ...oatValueMatcherColumnSelectorStrategy.java | 2 +- ...ongValueMatcherColumnSelectorStrategy.java | 2 +- .../epinephelinae/GroupByQueryEngineV2.java | 6 +++--- .../epinephelinae/RowBasedGrouperHelper.java | 6 +++--- .../java/io/druid/query/topn/TopNMapFn.java | 6 +++--- .../druid/segment/DimensionHandlerUtils.java | 19 +++++++++++++++++++ .../io/druid/sql/calcite/rel/QueryMaker.java | 6 +++--- 8 files changed, 34 insertions(+), 15 deletions(-) diff --git a/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java index 92914d67fa1e..f41e6c6a0242 100644 --- a/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/DoubleValueMatcherColumnSelectorStrategy.java @@ -31,7 +31,7 @@ public class DoubleValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseDoubleColumnValueSelector selector, final String value) { - final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value, false); + final Double matchVal = DimensionHandlerUtils.convertObjectToDouble(value); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java index 7a74962d41cc..af7a2b215ed9 100644 --- a/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/FloatValueMatcherColumnSelectorStrategy.java @@ -30,7 +30,7 @@ public class FloatValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseFloatColumnValueSelector selector, final String value) { - final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value, false); + final Float matchVal = DimensionHandlerUtils.convertObjectToFloat(value); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java index cffd8ffa54f8..15a62bd50631 100644 --- a/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java +++ b/processing/src/main/java/io/druid/query/filter/LongValueMatcherColumnSelectorStrategy.java @@ -30,7 +30,7 @@ public class LongValueMatcherColumnSelectorStrategy @Override public ValueMatcher makeValueMatcher(final BaseLongColumnValueSelector selector, final String value) { - final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value, false); + final Long matchVal = DimensionHandlerUtils.convertObjectToLong(value); if (matchVal == null) { return BooleanValueMatcher.of(false); } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java index 1f2b9ce1e34c..6db22e1efb19 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByQueryEngineV2.java @@ -688,15 +688,15 @@ private static void convertRowTypesToOutputTypes(List dimensionSp baseVal = baseVal == null ? "" : baseVal.toString(); break; case LONG: - baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal, false); + baseVal = DimensionHandlerUtils.convertObjectToLong(baseVal); baseVal = baseVal == null ? 0L : baseVal; break; case FLOAT: - baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal, false); + baseVal = DimensionHandlerUtils.convertObjectToFloat(baseVal); baseVal = baseVal == null ? 0.f : baseVal; break; case DOUBLE: - baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal, false); + baseVal = DimensionHandlerUtils.convertObjectToDouble(baseVal); baseVal = baseVal == null ? 0.d : baseVal; break; default: diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java index 7051460f7387..290cf13753bf 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/RowBasedGrouperHelper.java @@ -613,21 +613,21 @@ private static Function[] makeValueConvertFunctions( case LONG: functions[i] = input -> { - final Long val = DimensionHandlerUtils.convertObjectToLong(input, false); + final Long val = DimensionHandlerUtils.convertObjectToLong(input); return val == null ? 0L : val; }; break; case FLOAT: functions[i] = input -> { - final Float val = DimensionHandlerUtils.convertObjectToFloat(input, false); + final Float val = DimensionHandlerUtils.convertObjectToFloat(input); return val == null ? 0.f : val; }; break; case DOUBLE: functions[i] = input -> { - Double val = DimensionHandlerUtils.convertObjectToDouble(input, false); + Double val = DimensionHandlerUtils.convertObjectToDouble(input); return val == null ? 0.0 : val; }; break; diff --git a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java index e8625395f167..24d05fa37775 100644 --- a/processing/src/main/java/io/druid/query/topn/TopNMapFn.java +++ b/processing/src/main/java/io/druid/query/topn/TopNMapFn.java @@ -52,16 +52,16 @@ public static Function getValueTransformer(ValueType outputType) private static Function STRING_TRANSFORMER = input -> Objects.toString(input, null); private static Function LONG_TRANSFORMER = input -> { - final Long longVal = DimensionHandlerUtils.convertObjectToLong(input, false); + final Long longVal = DimensionHandlerUtils.convertObjectToLong(input); return longVal == null ? DimensionHandlerUtils.ZERO_LONG : longVal; }; private static Function FLOAT_TRANSFORMER = input -> { - final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input, false); + final Float floatVal = DimensionHandlerUtils.convertObjectToFloat(input); return floatVal == null ? DimensionHandlerUtils.ZERO_FLOAT : floatVal; }; private static Function DOUBLE_TRANSFORMER = input -> { - final Double doubleValue = DimensionHandlerUtils.convertObjectToDouble(input, false); + final Double doubleValue = DimensionHandlerUtils.convertObjectToDouble(input); return doubleValue == null ? DimensionHandlerUtils.ZERO_DOUBLE : doubleValue; }; diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 3fbf9c29bb19..2f1fd16f511a 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -35,6 +35,7 @@ import io.druid.segment.column.ValueType; import javax.annotation.Nullable; +import javax.validation.constraints.Null; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; @@ -237,6 +238,12 @@ private static Colu return strategyFactory.makeColumnSelectorStrategy(capabilities, selector); } + @Nullable + public static Long convertObjectToLong(@Nullable Object valObj) + { + return convertObjectToLong(valObj, false); + } + @Nullable public static Long convertObjectToLong(@Nullable Object valObj, boolean reportParseExceptions) { @@ -259,6 +266,12 @@ public static Long convertObjectToLong(@Nullable Object valObj, boolean reportPa } } + @Nullable + public static Float convertObjectToFloat(@Nullable Object valObj) + { + return convertObjectToFloat(valObj, false); + } + @Nullable public static Float convertObjectToFloat(@Nullable Object valObj, boolean reportParseExceptions) { @@ -281,6 +294,12 @@ public static Float convertObjectToFloat(@Nullable Object valObj, boolean report } } + @Nullable + public static Double convertObjectToDouble(@Nullable Object valObj) + { + return convertObjectToDouble(valObj, false); + } + @Nullable public static Double convertObjectToDouble(@Nullable Object valObj, boolean reportParseExceptions) { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index 6adefc64f17a..ed006a8b95ab 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -430,21 +430,21 @@ private Object coerce(final Object value, final SqlTypeName sqlType) } } else if (sqlType == SqlTypeName.BIGINT) { try { - coercedValue = DimensionHandlerUtils.convertObjectToLong(value, false); + coercedValue = DimensionHandlerUtils.convertObjectToLong(value); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } } else if (sqlType == SqlTypeName.FLOAT) { try { - coercedValue = DimensionHandlerUtils.convertObjectToFloat(value, false); + coercedValue = DimensionHandlerUtils.convertObjectToFloat(value); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); } } else if (SqlTypeName.FRACTIONAL_TYPES.contains(sqlType)) { try { - coercedValue = DimensionHandlerUtils.convertObjectToDouble(value, false); + coercedValue = DimensionHandlerUtils.convertObjectToDouble(value); } catch (Exception e) { throw new ISE("Cannot coerce[%s] to %s", value.getClass().getName(), sqlType); From 7311fca8747a171241b4c3e8b2c7e79daeecc018 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 2 Feb 2018 16:30:53 -0800 Subject: [PATCH 5/5] Fix import --- .../src/main/java/io/druid/segment/DimensionHandlerUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 2f1fd16f511a..5d7a56a5f046 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -35,7 +35,6 @@ import io.druid.segment.column.ValueType; import javax.annotation.Nullable; -import javax.validation.constraints.Null; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List;