diff --git a/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java b/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java index 8847dea0278e..1fafa37a6241 100644 --- a/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java +++ b/api/src/main/java/io/druid/data/input/impl/MapInputRowParser.java @@ -23,11 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Lists; import com.google.common.collect.Sets; - import io.druid.data.input.InputRow; import io.druid.data.input.MapBasedInputRow; import io.druid.java.util.common.parsers.ParseException; - import org.joda.time.DateTime; import java.util.List; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index f6774843ee7c..2f2862fdbe80 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -261,6 +261,7 @@ private Map> determineShardSpecs( // determine intervals containing data and prime HLL collectors final Map> hllCollectors = Maps.newHashMap(); int thrownAway = 0; + int unparseable = 0; log.info("Determining intervals and shardSpecs"); long determineShardSpecsStartMillis = System.currentTimeMillis(); @@ -269,48 +270,61 @@ private Map> determineShardSpecs( firehoseTempDir) ) { while (firehose.hasMore()) { - final InputRow inputRow = firehose.nextRow(); + try { + final InputRow inputRow = firehose.nextRow(); - // The null inputRow means the caller must skip this row. - if (inputRow == null) { - continue; - } + // The null inputRow means the caller must skip this row. + if (inputRow == null) { + continue; + } - final Interval interval; - if (determineIntervals) { - interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); - } else { - final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); - if (!optInterval.isPresent()) { - thrownAway++; + final Interval interval; + if (determineIntervals) { + interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp()); + } else { + final Optional optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp()); + if (!optInterval.isPresent()) { + thrownAway++; + continue; + } + interval = optInterval.get(); + } + + if (!determineNumPartitions) { + // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() + // for the interval and don't instantiate a HLL collector + if (!hllCollectors.containsKey(interval)) { + hllCollectors.put(interval, Optional.absent()); + } continue; } - interval = optInterval.get(); - } - if (!determineNumPartitions) { - // we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent() - // for the interval and don't instantiate a HLL collector if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.absent()); + hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); } - continue; - } - if (!hllCollectors.containsKey(interval)) { - hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector())); + List groupKey = Rows.toGroupKey( + queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), + inputRow + ); + hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); + } + catch (ParseException e) { + if (ingestionSchema.getTuningConfig().isReportParseExceptions()) { + throw e; + } else { + unparseable++; + } } - - List groupKey = Rows.toGroupKey( - queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(), - inputRow - ); - hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes()); } } + // These metrics are reported in generateAndPublishSegments() if (thrownAway > 0) { - log.warn("Unable to to find a matching interval for [%,d] events", thrownAway); + log.warn("Unable to find a matching interval for [%,d] events", thrownAway); + } + if (unparseable > 0) { + log.warn("Unable to parse [%,d] events", unparseable); } final ImmutableSortedMap> sortedMap = ImmutableSortedMap.copyOf( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index d25811517fab..e63de4aa6ff7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -37,8 +37,10 @@ import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskAction; import io.druid.indexing.common.actions.TaskActionClient; +import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.java.util.common.granularity.Granularities; +import io.druid.java.util.common.parsers.ParseException; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; import io.druid.segment.IndexIO; @@ -62,6 +64,7 @@ import org.junit.Assert; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.rules.TemporaryFolder; import java.io.BufferedWriter; @@ -79,6 +82,9 @@ public class IndexTaskTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec( new TimestampSpec( "ts", @@ -446,7 +452,6 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception writer.write("2014-01-01T00:00:10Z,a,1\n"); } - IndexTask indexTask = new IndexTask( null, null, @@ -487,6 +492,115 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); } + @Test + public void testIgnoreParseException() throws Exception + { + final File tmpDir = temporaryFolder.newFolder(); + + final File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,d,val\n"); + writer.write("unparseable,a,1\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + // GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in + // IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments() + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false, + false // ignore parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + final List segments = runTask(indexTask); + + Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions()); + Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics()); + Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval()); + } + + @Test + public void testReportParseException() throws Exception + { + expectedException.expect(ParseException.class); + expectedException.expectMessage("Unparseable timestamp found!"); + + final File tmpDir = temporaryFolder.newFolder(); + + final File tmpFile = File.createTempFile("druid", "index", tmpDir); + + try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) { + writer.write("time,d,val\n"); + writer.write("unparseable,a,1\n"); + writer.write("2014-01-01T00:00:10Z,a,1\n"); + } + + final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec( + tmpDir, + new CSVParseSpec( + new TimestampSpec( + "time", + "auto", + null + ), + new DimensionsSpec( + null, + Lists.newArrayList(), + Lists.newArrayList() + ), + null, + Arrays.asList("time", "dim", "val"), + true, + 0 + ), + null, + 2, + null, + false, + false, + true // report parse exception + ); + + IndexTask indexTask = new IndexTask( + null, + null, + parseExceptionIgnoreSpec, + null, + jsonMapper + ); + + runTask(indexTask); + } + private final List runTask(final IndexTask indexTask) throws Exception { final List segments = Lists.newArrayList(); @@ -577,6 +691,29 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( boolean forceExtendableShardSpecs, boolean appendToExisting ) + { + return createIngestionSpec( + baseDir, + parseSpec, + granularitySpec, + targetPartitionSize, + numShards, + forceExtendableShardSpecs, + appendToExisting, + true + ); + } + + private IndexTask.IndexIngestionSpec createIngestionSpec( + File baseDir, + ParseSpec parseSpec, + GranularitySpec granularitySpec, + Integer targetPartitionSize, + Integer numShards, + boolean forceExtendableShardSpecs, + boolean appendToExisting, + boolean reportParseException + ) { return new IndexTask.IndexIngestionSpec( new DataSchema( @@ -614,7 +751,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( null, true, forceExtendableShardSpecs, - true, + reportParseException, null ) );