Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
final List<Object> groupKey = Rows.toGroupKey(
Expand Down Expand Up @@ -395,8 +394,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
final Map<String, Iterable<String>> dims = Maps.newHashMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,6 @@ public boolean isOverwriteFiles()
return schema.getTuningConfig().isOverwriteFiles();
}

public boolean isIgnoreInvalidRows()
{
return schema.getTuningConfig().isIgnoreInvalidRows();
}

public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;
private boolean reportParseExceptions;

@Override
protected void setup(Context context)
Expand All @@ -54,7 +53,6 @@ protected void setup(Context context)
config = HadoopDruidIndexerConfig.fromConfiguration(context.getConfiguration());
parser = config.getParser();
granularitySpec = config.getGranularitySpec();
reportParseExceptions = !config.isIgnoreInvalidRows();
}

public HadoopDruidIndexerConfig getConfig()
Expand Down Expand Up @@ -88,7 +86,7 @@ protected void map(Object key, Object value, Context context) throws IOException
if (!granularitySpec.bucketIntervals().isPresent()
|| granularitySpec.bucketInterval(DateTimes.utc(inputRow.getTimestampFromEpoch()))
.isPresent()) {
innerMap(inputRow, context, reportParseExceptions);
innerMap(inputRow, context);
} else {
context.getCounter(HadoopDruidIndexerConfig.IndexJobCounters.ROWS_THROWN_AWAY_COUNTER).increment(1);
}
Expand Down Expand Up @@ -147,7 +145,7 @@ private static List<InputRow> parseInputRow(Object value, InputRowParser parser)
}
}

protected abstract void innerMap(InputRow inputRow, Context context, boolean reportParseExceptions)
protected abstract void innerMap(InputRow inputRow, Context context)
throws IOException, InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
private final boolean overwriteFiles;
private final boolean ignoreInvalidRows;
private final Boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean useCombiner;
Expand All @@ -108,7 +108,7 @@ public HadoopTuningConfig(
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
final @Deprecated @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @Deprecated @JsonProperty("ignoreInvalidRows") Boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("useCombiner") Boolean useCombiner,
Expand Down Expand Up @@ -138,7 +138,6 @@ public HadoopTuningConfig(
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
this.ignoreInvalidRows = ignoreInvalidRows;
this.jobProperties = (jobProperties == null
? ImmutableMap.of()
: ImmutableMap.copyOf(jobProperties));
Expand All @@ -152,10 +151,16 @@ public HadoopTuningConfig(
this.useExplicitVersion = useExplicitVersion;
this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix;

if (!this.ignoreInvalidRows) {
this.maxParseExceptions = 0;

this.ignoreInvalidRows = ignoreInvalidRows;
if (maxParseExceptions != null) {
this.maxParseExceptions = maxParseExceptions;
} else {
this.maxParseExceptions = maxParseExceptions == null ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : maxParseExceptions;
if (ignoreInvalidRows == null || !ignoreInvalidRows) {
this.maxParseExceptions = 0;
} else {
this.maxParseExceptions = TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS;
}
}
this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
}
Expand Down Expand Up @@ -221,7 +226,7 @@ public boolean isOverwriteFiles()
}

@JsonProperty
public boolean isIgnoreInvalidRows()
public Boolean isIgnoreInvalidRows()
{
return ignoreInvalidRows;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ private static IncrementalIndex makeIncrementalIndex(

IncrementalIndex newIndex = new IncrementalIndex.Builder()
.setIndexSchema(indexSchema)
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows())
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) // only used by OffHeapIncrementalIndex
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
.buildOnheap();
Expand Down Expand Up @@ -334,8 +334,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Context context,
boolean reportParseExceptions
Context context
) throws IOException, InterruptedException
{
// Group by bucket, sort by timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,7 @@ public static class MyMapper extends HadoopDruidIndexerMapper
@Override
protected void innerMap(
final InputRow inputRow,
final Context context,
final boolean reportParseExceptions
final Context context
)
{
rows.add(inputRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public ColumnCapabilities getColumnCapabilities(String columnName)
private final AggregatorFactory[] metrics;
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean reportParseExceptions; // only used by OffHeapIncrementalIndex
private final Metadata metadata;

private final Map<String, MetricDesc> metricDescs;
Expand Down