diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index c9111d00cd35..a9726bb2db16 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -64,6 +64,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapreduce.Counter; +import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Partitioner; @@ -210,9 +211,17 @@ public boolean run() boolean success = job.waitForCompletion(true); - Counter invalidRowCount = job.getCounters() - .findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER); - jobStats.setInvalidRowCount(invalidRowCount.getValue()); + Counters counters = job.getCounters(); + if (counters == null) { + log.info("No counters found for job [%s]", job.getJobName()); + } else { + Counter invalidRowCount = counters.findCounter(HadoopDruidIndexerConfig.IndexJobCounters.INVALID_ROW_COUNTER); + if (invalidRowCount != null) { + jobStats.setInvalidRowCount(invalidRowCount.getValue()); + } else { + log.info("No invalid row counter found for job [%s]", job.getJobName()); + } + } return success; } @@ -258,6 +267,7 @@ public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper typeHelperMap; @Override protected void setup(Context context) @@ -269,6 +279,11 @@ protected void setup(Context context) for (int i = 0; i < aggregators.length; ++i) { combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -299,9 +314,9 @@ protected void innerMap( // and they contain the columns as they show up in the segment after ingestion, not what you would see in raw // data byte[] serializedInputRow = inputRow instanceof SegmentInputRow ? - InputRowSerde.toBytes(inputRow, combiningAggs, reportParseExceptions) + InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, reportParseExceptions) : - InputRowSerde.toBytes(inputRow, aggregators, reportParseExceptions); + InputRowSerde.toBytes(typeHelperMap, inputRow, aggregators, reportParseExceptions); context.write( new SortableBytes( @@ -322,6 +337,7 @@ public static class IndexGeneratorCombiner extends Reducer typeHelperMap; @Override protected void setup(Context context) @@ -334,6 +350,11 @@ protected void setup(Context context) for (int i = 0; i < aggregators.length; ++i) { combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -350,11 +371,11 @@ protected void reduce( SortableBytes keyBytes = SortableBytes.fromBytesWritable(key); Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs; IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, null, null); - index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators)); + index.add(InputRowSerde.fromBytes(typeHelperMap, first.getBytes(), aggregators)); while (iter.hasNext()) { context.progress(); - InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators); + InputRow value = InputRowSerde.fromBytes(typeHelperMap, iter.next().getBytes(), aggregators); if (!index.canAppendRow()) { dimOrder.addAll(index.getDimensionOrder()); @@ -381,10 +402,13 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind context.progress(); Row row = rows.next(); InputRow inputRow = getInputRowFromRow(row, dimensions); + // reportParseExceptions is true as any unparseable data is already handled by the mapper. + byte[] serializedRow = InputRowSerde.toBytes(typeHelperMap, inputRow, combiningAggs, true); + context.write( key, - new BytesWritable(InputRowSerde.toBytes(inputRow, combiningAggs, true)) + new BytesWritable(serializedRow) ); } index.close(); @@ -479,6 +503,7 @@ public static class IndexGeneratorReducer extends Reducer metricNames = Lists.newArrayList(); private AggregatorFactory[] aggregators; private AggregatorFactory[] combiningAggs; + private Map typeHelperMap; protected ProgressIndicator makeProgressIndicator(final Context context) { @@ -530,6 +555,11 @@ protected void setup(Context context) metricNames.add(aggregators[i].getName()); combiningAggs[i] = aggregators[i].getCombiningFactory(); } + typeHelperMap = InputRowSerde.getTypeHelperMap(config.getSchema() + .getDataSchema() + .getParser() + .getParseSpec() + .getDimensionsSpec()); } @Override @@ -597,7 +627,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) for (final BytesWritable bw : values) { context.progress(); - final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(bw.getBytes(), aggregators)); + final InputRow inputRow = index.formatRow(InputRowSerde.fromBytes(typeHelperMap, bw.getBytes(), aggregators)); int numRows = index.add(inputRow); ++lineCount; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java index 672f77133746..cd1dd531604a 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/InputRowSerde.java @@ -22,17 +22,24 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; + import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.Rows; +import io.druid.data.input.impl.DimensionSchema; +import io.druid.data.input.impl.DimensionsSpec; import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.Aggregator; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.DimensionHandlerUtils; import io.druid.segment.VirtualColumns; +import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.serde.ComplexMetricSerde; import io.druid.segment.serde.ComplexMetrics; @@ -49,7 +56,165 @@ public class InputRowSerde { private static final Logger log = new Logger(InputRowSerde.class); - public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, boolean reportParseExceptions) + private static final IndexSerdeTypeHelper STRING_HELPER = new StringIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper LONG_HELPER = new LongIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper FLOAT_HELPER = new FloatIndexSerdeTypeHelper(); + private static final IndexSerdeTypeHelper DOUBLE_HELPER = new DoubleIndexSerdeTypeHelper(); + + public interface IndexSerdeTypeHelper + { + ValueType getType(); + + void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions); + + T deserialize(ByteArrayDataInput in); + } + + public static Map getTypeHelperMap(DimensionsSpec dimensionsSpec) + { + Map typeHelperMap = Maps.newHashMap(); + for (DimensionSchema dimensionSchema : dimensionsSpec.getDimensions()) { + IndexSerdeTypeHelper typeHelper; + switch (dimensionSchema.getValueType()) { + case STRING: + typeHelper = STRING_HELPER; + break; + case LONG: + typeHelper = LONG_HELPER; + break; + case FLOAT: + typeHelper = FLOAT_HELPER; + break; + case DOUBLE: + typeHelper = DOUBLE_HELPER; + break; + default: + throw new IAE("Invalid type: [%s]", dimensionSchema.getValueType()); + } + typeHelperMap.put(dimensionSchema.getName(), typeHelper); + } + return typeHelperMap; + } + + public static class StringIndexSerdeTypeHelper implements IndexSerdeTypeHelper> + { + @Override + public ValueType getType() + { + return ValueType.STRING; + } + + @Override + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) + { + List values = Rows.objectToStrings(value); + try { + writeStringArray(values, out); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + @Override + public List deserialize(ByteArrayDataInput in) + { + try { + return readStringArray(in); + } + catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + } + + public static class LongIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.LONG; + } + + @Override + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) + { + Long ret = DimensionHandlerUtils.convertObjectToLong(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_LONG; + } + out.writeLong(ret); + } + + @Override + public Long deserialize(ByteArrayDataInput in) + { + return in.readLong(); + } + } + + public static class FloatIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.FLOAT; + } + + @Override + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) + { + Float ret = DimensionHandlerUtils.convertObjectToFloat(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_FLOAT; + } + out.writeFloat(ret); + } + + @Override + public Float deserialize(ByteArrayDataInput in) + { + return in.readFloat(); + } + } + + public static class DoubleIndexSerdeTypeHelper implements IndexSerdeTypeHelper + { + @Override + public ValueType getType() + { + return ValueType.DOUBLE; + } + + @Override + public void serialize(ByteArrayDataOutput out, Object value, boolean reportParseExceptions) + { + Double ret = DimensionHandlerUtils.convertObjectToDouble(value, reportParseExceptions); + if (ret == null) { + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + // we'll also need to change the serialized encoding so that it can represent numeric nulls + ret = DimensionHandlerUtils.ZERO_DOUBLE; + } + out.writeDouble(ret); + } + + @Override + public Double deserialize(ByteArrayDataInput in) + { + return in.readDouble(); + } + } + + public static final byte[] toBytes( + final Map typeHelperMap, + final InputRow row, + AggregatorFactory[] aggs, + boolean reportParseExceptions + ) { try { ByteArrayDataOutput out = ByteStreams.newDataOutput(); @@ -63,9 +228,12 @@ public static final byte[] toBytes(final InputRow row, AggregatorFactory[] aggs, WritableUtils.writeVInt(out, dimList.size()); if (dimList != null) { for (String dim : dimList) { - List dimValues = row.getDimension(dim); + IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dim); + if (typeHelper == null) { + typeHelper = STRING_HELPER; + } writeString(dim, out); - writeStringArray(dimValues, out); + typeHelper.serialize(out, row.getRaw(dim), reportParseExceptions); } } @@ -176,10 +344,14 @@ private static List readStringArray(DataInput in) throws IOException return values; } - public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) + public static final InputRow fromBytes( + final Map typeHelperMap, + byte[] data, + AggregatorFactory[] aggs + ) { try { - DataInput in = ByteStreams.newDataInput(data); + ByteArrayDataInput in = ByteStreams.newDataInput(data); //Read timestamp long timestamp = in.readLong(); @@ -192,14 +364,25 @@ public static final InputRow fromBytes(byte[] data, AggregatorFactory[] aggs) for (int i = 0; i < dimNum; i++) { String dimension = readString(in); dimensions.add(dimension); - List dimensionValues = readStringArray(in); - if (dimensionValues == null) { + + IndexSerdeTypeHelper typeHelper = typeHelperMap.get(dimension); + if (typeHelper == null) { + typeHelper = STRING_HELPER; + } + Object dimValues = typeHelper.deserialize(in); + if (dimValues == null) { continue; } - if (dimensionValues.size() == 1) { - event.put(dimension, dimensionValues.get(0)); + + if (typeHelper.getType() == ValueType.STRING) { + List dimensionValues = (List) dimValues; + if (dimensionValues.size() == 1) { + event.put(dimension, dimensionValues.get(0)); + } else { + event.put(dimension, dimensionValues); + } } else { - event.put(dimension, dimensionValues); + event.put(dimension, dimValues); } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java index 9eaf75349ed9..9eb75e27b779 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorCombinerTest.java @@ -25,6 +25,7 @@ import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimeAndDimsParseSpec; import io.druid.data.input.impl.TimestampSpec; @@ -144,6 +145,17 @@ public void testMultipleRowsMerged() throws Exception ); BytesWritable key = keySortableBytes.toBytesWritable(); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("host"), + new StringDimensionSchema("keywords") + ), + null, + null + ); + + Map typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec); + InputRow row1 = new MapBasedInputRow( timestamp, ImmutableList.of("keywords"), @@ -163,8 +175,8 @@ public void testMultipleRowsMerged() throws Exception ) ); List rows = Lists.newArrayList( - new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)), - new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true)) + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators, true)), + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators, true)) ); Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); @@ -183,7 +195,7 @@ public void testMultipleRowsMerged() throws Exception Assert.assertTrue(captureKey.getValue() == key); - InputRow capturedRow = InputRowSerde.fromBytes(captureVal.getValue().getBytes(), aggregators); + InputRow capturedRow = InputRowSerde.fromBytes(typeHelperMap, captureVal.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow.getDimensions()); Assert.assertEquals(ImmutableList.of(), capturedRow.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow.getDimension("keywords")); @@ -228,9 +240,21 @@ public void testMultipleRowsNotMerged() throws Exception "visited", 5 ) ); + + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("host"), + new StringDimensionSchema("keywords") + ), + null, + null + ); + + Map typeHelperMap = InputRowSerde.getTypeHelperMap(dimensionsSpec); + List rows = Lists.newArrayList( - new BytesWritable(InputRowSerde.toBytes(row1, aggregators, true)), - new BytesWritable(InputRowSerde.toBytes(row2, aggregators, true)) + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row1, aggregators, true)), + new BytesWritable(InputRowSerde.toBytes(typeHelperMap, row2, aggregators, true)) ); Reducer.Context context = EasyMock.createNiceMock(Reducer.Context.class); @@ -253,7 +277,7 @@ public void testMultipleRowsNotMerged() throws Exception Assert.assertTrue(captureKey1.getValue() == key); Assert.assertTrue(captureKey2.getValue() == key); - InputRow capturedRow1 = InputRowSerde.fromBytes(captureVal1.getValue().getBytes(), aggregators); + InputRow capturedRow1 = InputRowSerde.fromBytes(typeHelperMap, captureVal1.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow1.getDimensions()); Assert.assertEquals(Collections.singletonList("host1"), capturedRow1.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow1.getDimension("keywords")); @@ -264,7 +288,7 @@ public void testMultipleRowsNotMerged() throws Exception 0.001 ); - InputRow capturedRow2 = InputRowSerde.fromBytes(captureVal2.getValue().getBytes(), aggregators); + InputRow capturedRow2 = InputRowSerde.fromBytes(typeHelperMap, captureVal2.getValue().getBytes(), aggregators); Assert.assertEquals(Arrays.asList("host", "keywords"), capturedRow2.getDimensions()); Assert.assertEquals(Collections.singletonList("host2"), capturedRow2.getDimension("host")); Assert.assertEquals(Arrays.asList("bar", "foo"), capturedRow2.getDimension("keywords")); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java index 67a9b5abf8a3..71609e42dd32 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/InputRowSerdeTest.java @@ -20,9 +20,14 @@ package io.druid.indexer; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; +import io.druid.data.input.impl.DimensionsSpec; +import io.druid.data.input.impl.DoubleDimensionSchema; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; +import io.druid.data.input.impl.StringDimensionSchema; import io.druid.hll.HyperLogLogCollector; import io.druid.jackson.AggregatorsModule; import io.druid.java.util.common.parsers.ParseException; @@ -35,8 +40,11 @@ import io.druid.segment.ColumnSelectorFactory; import org.easymock.EasyMock; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; @@ -53,17 +61,22 @@ public class InputRowSerdeTest new AggregatorsModule(); //registers ComplexMetric serde for hyperUnique } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public InputRowSerdeTest() { this.timestamp = System.currentTimeMillis(); - this.dims = ImmutableList.of("dim_non_existing", "d1", "d2"); - this.event = ImmutableMap.of( - "d1", "d1v", - "d2", ImmutableList.of("d2v1", "d2v2"), - "m1", 5.0f, - "m2", 100L, - "m3", "m3v" - ); + this.dims = ImmutableList.of("dim_non_existing", "d1", "d2", "d3", "d4", "d5"); + this.event = Maps.newHashMap(); + event.put("d1", "d1v"); + event.put("d2", ImmutableList.of("d2v1", "d2v2")); + event.put("d3", 200L); + event.put("d4", 300.1f); + event.put("d5", 400.5d); + event.put("m1", 5.0f); + event.put("m2", 100L); + event.put("m3", "m3v"); } @Test @@ -99,14 +112,29 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) } }; - byte[] data = InputRowSerde.toBytes(in, aggregatorFactories, false); // Ignore Unparseable aggregator - InputRow out = InputRowSerde.fromBytes(data, aggregatorFactories); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + + byte[] data = InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, false); // Ignore Unparseable aggregator + InputRow out = InputRowSerde.fromBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), data, aggregatorFactories); Assert.assertEquals(timestamp, out.getTimestampFromEpoch()); Assert.assertEquals(dims, out.getDimensions()); Assert.assertEquals(Collections.EMPTY_LIST, out.getDimension("dim_non_existing")); Assert.assertEquals(ImmutableList.of("d1v"), out.getDimension("d1")); Assert.assertEquals(ImmutableList.of("d2v1", "d2v2"), out.getDimension("d2")); + Assert.assertEquals(200L, out.getRaw("d3")); + Assert.assertEquals(300.1f, out.getRaw("d4")); + Assert.assertEquals(400.5d, out.getRaw("d5")); Assert.assertEquals(0.0f, out.getMetric("agg_non_existing").floatValue(), 0.00001); Assert.assertEquals(5.0f, out.getMetric("m1out").floatValue(), 0.00001); @@ -117,7 +145,7 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory) EasyMock.verify(mockedAggregator); } - @Test(expected = ParseException.class) + @Test public void testThrowParseExceptions() { InputRow in = new MapBasedInputRow( @@ -133,7 +161,66 @@ public void testThrowParseExceptions() new LongSumAggregatorFactory("unparseable", "m3") // Unparseable from String to Long }; - InputRowSerde.toBytes(in, aggregatorFactories, true); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new StringDimensionSchema("d1"), + new StringDimensionSchema("d2"), + new LongDimensionSchema("d3"), + new FloatDimensionSchema("d4"), + new DoubleDimensionSchema("d5") + ), + null, + null + ); + expectedException.expect(ParseException.class); + expectedException.expectMessage("Encountered parse error for aggregator[unparseable]"); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + } + + @Test + public void testDimensionParseExceptions() + { + InputRow in = new MapBasedInputRow( + timestamp, + dims, + event + ); + AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{ + new LongSumAggregatorFactory("m2out", "m2") + }; + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to long"); + DimensionsSpec dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new LongDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to float"); + dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new FloatDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [d1v] to double"); + dimensionsSpec = new DimensionsSpec( + Arrays.asList( + new DoubleDimensionSchema("d1") + ), + null, + null + ); + InputRowSerde.toBytes(InputRowSerde.getTypeHelperMap(dimensionsSpec), in, aggregatorFactories, true); } } diff --git a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java index 504e46c4f97f..5d7a56a5f046 100644 --- a/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java +++ b/processing/src/main/java/io/druid/segment/DimensionHandlerUtils.java @@ -239,6 +239,12 @@ private static Colu @Nullable public static Long convertObjectToLong(@Nullable Object valObj) + { + return convertObjectToLong(valObj, false); + } + + @Nullable + public static Long convertObjectToLong(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_LONG; @@ -249,7 +255,11 @@ public static Long convertObjectToLong(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).longValue(); } else if (valObj instanceof String) { - return DimensionHandlerUtils.getExactLongFromDecimalString((String) valObj); + Long ret = DimensionHandlerUtils.getExactLongFromDecimalString((String) valObj); + if (reportParseExceptions && ret == null) { + throw new ParseException("could not convert value [%s] to long", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } @@ -257,6 +267,12 @@ public static Long convertObjectToLong(@Nullable Object valObj) @Nullable public static Float convertObjectToFloat(@Nullable Object valObj) + { + return convertObjectToFloat(valObj, false); + } + + @Nullable + public static Float convertObjectToFloat(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_FLOAT; @@ -267,7 +283,11 @@ public static Float convertObjectToFloat(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).floatValue(); } else if (valObj instanceof String) { - return Floats.tryParse((String) valObj); + Float ret = Floats.tryParse((String) valObj); + if (reportParseExceptions && ret == null) { + throw new ParseException("could not convert value [%s] to float", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } @@ -275,6 +295,12 @@ public static Float convertObjectToFloat(@Nullable Object valObj) @Nullable public static Double convertObjectToDouble(@Nullable Object valObj) + { + return convertObjectToDouble(valObj, false); + } + + @Nullable + public static Double convertObjectToDouble(@Nullable Object valObj, boolean reportParseExceptions) { if (valObj == null) { return ZERO_DOUBLE; @@ -285,7 +311,11 @@ public static Double convertObjectToDouble(@Nullable Object valObj) } else if (valObj instanceof Number) { return ((Number) valObj).doubleValue(); } else if (valObj instanceof String) { - return Doubles.tryParse((String) valObj); + Double ret = Doubles.tryParse((String) valObj); + if (reportParseExceptions && ret == null) { + throw new ParseException("could not convert value [%s] to double", valObj); + } + return ret; } else { throw new ParseException("Unknown type[%s]", valObj.getClass()); } diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index 390c1992ea63..cbc298c2f246 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -122,9 +122,10 @@ public interface DimensionIndexer * * @param dimValues Single row val to process * + * @param reportParseExceptions * @return An array containing an encoded representation of the input row value. */ - EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues); + EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions); /** diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index 26d71abc2933..2643ed6ff386 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -34,12 +34,14 @@ public class DoubleDimensionIndexer implements DimensionIndexer zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_DOUBLE : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java index 49c93f576107..6ac82d6abc36 100644 --- a/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/FloatDimensionIndexer.java @@ -34,13 +34,15 @@ public class FloatDimensionIndexer implements DimensionIndexer zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_FLOAT : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java index 8aeeca25395d..29afdde65b9c 100644 --- a/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/LongDimensionIndexer.java @@ -34,13 +34,15 @@ public class LongDimensionIndexer implements DimensionIndexer { @Override - public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues) + public Long processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions) { if (dimValues instanceof List) { throw new UnsupportedOperationException("Numeric columns do not support multivalue rows."); } - return DimensionHandlerUtils.convertObjectToLong(dimValues); + Long ret = DimensionHandlerUtils.convertObjectToLong(dimValues, reportParseExceptions); + // remove null -> zero conversion when https://github.com/druid-io/druid/pull/5278 series of patches is merged + return ret == null ? DimensionHandlerUtils.ZERO_LONG : ret; } @Override diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index bbe89d603388..af994ecfd6e9 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -188,7 +188,7 @@ public StringDimensionIndexer(MultiValueHandling multiValueHandling) } @Override - public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues) + public int[] processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions) { final int[] encodedDimensionValues; final int oldDictSize = dimLookup.size(); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 09cb0cfd3dee..d2f19696129c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -554,7 +554,10 @@ TimeAndDims toTimeAndDims(InputRow row) } DimensionHandler handler = desc.getHandler(); DimensionIndexer indexer = desc.getIndexer(); - Object dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent(row.getRaw(dimension)); + Object dimsKey = indexer.processRowValsToUnsortedEncodedKeyComponent( + row.getRaw(dimension), + reportParseExceptions + ); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimsKey != null && handler.getLengthOfEncodedKeyComponent(dimsKey) > 1) { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 8cb44ffa669b..8eb148d6bb4a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -100,7 +100,7 @@ public IncrementalIndexAdapter( // Add 'null' to the dimension's dictionary. if (dimIndex >= dims.length || dims[dimIndex] == null) { - accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null); + accessor.indexer.processRowValsToUnsortedEncodedKeyComponent(null, true); continue; } final ColumnCapabilities capabilities = dimension.getCapabilities(); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index ee728cb51605..ca6ffe9fb66f 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -25,13 +25,15 @@ import com.google.common.collect.Lists; import io.druid.collections.StupidPool; import io.druid.data.input.MapBasedInputRow; -import io.druid.data.input.Row; import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.DoubleDimensionSchema; +import io.druid.data.input.impl.FloatDimensionSchema; +import io.druid.data.input.impl.LongDimensionSchema; import io.druid.data.input.impl.StringDimensionSchema; import io.druid.java.util.common.ISE; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; @@ -40,6 +42,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -60,6 +63,9 @@ interface IndexCreator IncrementalIndex createIndex(); } + @Rule + public ExpectedException expectedException = ExpectedException.none(); + @Rule public final CloserRule closer = new CloserRule(false); @@ -76,8 +82,8 @@ public static Collection constructorFeeder() throws IOException DimensionsSpec dimensions = new DimensionsSpec( Arrays.asList( new StringDimensionSchema("string"), - new StringDimensionSchema("float"), - new StringDimensionSchema("long"), + new FloatDimensionSchema("float"), + new LongDimensionSchema("long"), new DoubleDimensionSchema("double") ), null, null ); @@ -206,28 +212,54 @@ public void controlTest() throws IndexSizeExceededException } @Test - public void testNullDimensionTransform() throws IndexSizeExceededException + public void testUnparseableNumerics() throws IndexSizeExceededException { IncrementalIndex index = closer.closeLater(indexCreator.createIndex()); + + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [asdj] to long"); index.add( new MapBasedInputRow( System.currentTimeMillis() - 1, Lists.newArrayList("string", "float", "long", "double"), ImmutableMap.of( - "string", Arrays.asList("A", null, ""), - "float", Arrays.asList(Float.POSITIVE_INFINITY, null, ""), - "long", Arrays.asList(Long.MIN_VALUE, null, ""), - "double", "" + "string", "A", + "float", "19.0", + "long", "asdj", + "double", 21.0d ) ) ); - Row row = index.iterator().next(); + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [aaa] to float"); + index.add( + new MapBasedInputRow( + System.currentTimeMillis() - 1, + Lists.newArrayList("string", "float", "long", "double"), + ImmutableMap.of( + "string", "A", + "float", "aaa", + "long", 20, + "double", 21.0d + ) + ) + ); - Assert.assertEquals(Arrays.asList(new String[]{"", "", "A"}), row.getRaw("string")); - Assert.assertEquals(Arrays.asList(new String[]{"", "", String.valueOf(Float.POSITIVE_INFINITY)}), row.getRaw("float")); - Assert.assertEquals(Arrays.asList(new String[]{"", "", String.valueOf(Long.MIN_VALUE)}), row.getRaw("long")); - Assert.assertEquals(0.0, row.getMetric("double").doubleValue(), 0.0); + expectedException.expect(ParseException.class); + expectedException.expectMessage("could not convert value [] to double"); + index.add( + new MapBasedInputRow( + System.currentTimeMillis() - 1, + Lists.newArrayList("string", "float", "long", "double"), + ImmutableMap.of( + "string", "A", + "float", 19.0, + "long", 20, + "double", "" + ) + ) + ); } @Test