diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java index 6fb8a81bf128..0716e162feeb 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DetermineHashedPartitionsJob.java @@ -284,8 +284,7 @@ protected void setup(Context context) @Override protected void innerMap( InputRow inputRow, - Context context, - boolean reportParseExceptions + Context context ) throws IOException { diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java index ec1039dde39d..274861291103 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/DeterminePartitionsJob.java @@ -313,8 +313,7 @@ protected void setup(Context context) @Override protected void innerMap( InputRow inputRow, - Context context, - boolean reportParseExceptions + Context context ) throws IOException, InterruptedException { final List groupKey = Rows.toGroupKey( @@ -395,8 +394,7 @@ protected void setup(Context context) @Override protected void innerMap( InputRow inputRow, - Context context, - boolean reportParseExceptions + Context context ) throws IOException, InterruptedException { final Map> dims = Maps.newHashMap(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index 2c41a3fc07e9..b1b2f7fc894c 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -308,11 +308,6 @@ public boolean isOverwriteFiles() return schema.getTuningConfig().isOverwriteFiles(); } - public boolean isIgnoreInvalidRows() - { - return schema.getTuningConfig().isIgnoreInvalidRows(); - } - public void setShardSpecs(Map> shardSpecs) { this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs)); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java index 48f6b1da4e09..02ced6cf10c6 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerMapper.java @@ -45,7 +45,6 @@ public abstract class HadoopDruidIndexerMapper extends Mapper< protected HadoopDruidIndexerConfig config; private InputRowParser parser; protected GranularitySpec granularitySpec; - private boolean reportParseExceptions; @Override protected void setup(Context context) @@ -54,7 +53,6 @@ protected void setup(Context context) config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration()); parser = config.getParser(); granularitySpec = config.getGranularitySpec(); - reportParseExceptions = !config.isIgnoreInvalidRows(); } public HadoopDruidIndexerConfig getConfig() @@ -88,7 +86,7 @@ protected void map(Object key, Object value, Context context) throws IOException if (!granularitySpec.bucketIntervals().isPresent() || granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch())) .isPresent()) { - innerMap(inputRow, context, reportParseExceptions); + innerMap(inputRow, context); } else { context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1); } @@ -147,7 +145,7 @@ private static List parseInputRow(Object value, InputRowParser parser) } } - protected abstract void innerMap(InputRow inputRow, Context context, boolean reportParseExceptions) + protected abstract void innerMap(InputRow inputRow, Context context) throws IOException, InterruptedException; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 1f17fb89fe86..502066ce1de5 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -85,7 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final boolean leaveIntermediate; private final Boolean cleanupOnFailure; private final boolean overwriteFiles; - private final boolean ignoreInvalidRows; + private final Boolean ignoreInvalidRows; private final Map jobProperties; private final boolean combineText; private final boolean useCombiner; @@ -108,7 +108,7 @@ public HadoopTuningConfig( final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, final @JsonProperty("overwriteFiles") boolean overwriteFiles, - final @Deprecated @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows, + final @Deprecated @JsonProperty("ignoreInvalidRows") Boolean ignoreInvalidRows, final @JsonProperty("jobProperties") Map jobProperties, final @JsonProperty("combineText") boolean combineText, final @JsonProperty("useCombiner") Boolean useCombiner, @@ -138,7 +138,6 @@ public HadoopTuningConfig( this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; this.overwriteFiles = overwriteFiles; - this.ignoreInvalidRows = ignoreInvalidRows; this.jobProperties = (jobProperties == null ? ImmutableMap.of() : ImmutableMap.copyOf(jobProperties)); @@ -152,10 +151,16 @@ public HadoopTuningConfig( this.useExplicitVersion = useExplicitVersion; this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix; - if (!this.ignoreInvalidRows) { - this.maxParseExceptions = 0; + + this.ignoreInvalidRows = ignoreInvalidRows; + if (maxParseExceptions != null) { + this.maxParseExceptions = maxParseExceptions; } else { - this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions; + if (ignoreInvalidRows == null || !ignoreInvalidRows) { + this.maxParseExceptions = 0; + } else { + this.maxParseExceptions = TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS; + } } this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; } @@ -221,7 +226,7 @@ public boolean isOverwriteFiles() } @JsonProperty - public boolean isIgnoreInvalidRows() + public Boolean isIgnoreInvalidRows() { return ignoreInvalidRows; } diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index ac34c755ae95..6bf786be5a66 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -286,7 +286,7 @@ private static IncrementalIndex makeIncrementalIndex( IncrementalIndex newIndex = new IncrementalIndex.Builder() .setIndexSchema(indexSchema) - .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) + .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex .setMaxRowCount(tuningConfig.getRowFlushBoundary()) .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) .buildOnheap(); @@ -334,8 +334,7 @@ protected void setup(Context context) @Override protected void innerMap( InputRow inputRow, - Context context, - boolean reportParseExceptions + Context context ) throws IOException, InterruptedException { // Group by bucket, sort by timestamp diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java index 8630d13bdcac..43ca77d6217b 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerMapperTest.java @@ -262,8 +262,7 @@ public static class MyMapper extends HadoopDruidIndexerMapper @Override protected void innerMap( final InputRow inputRow, - final Context context, - final boolean reportParseExceptions + final Context context ) { rows.add(inputRow); diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java index c496d0237144..f95fba867133 100644 --- a/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/org/apache/druid/segment/incremental/IncrementalIndex.java @@ -231,7 +231,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName) private final AggregatorFactory[] metrics; private final AggregatorType[] aggs; private final boolean deserializeComplexMetrics; - private final boolean reportParseExceptions; + private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex private final Metadata metadata; private final Map metricDescs;