Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ private IncrementalIndex makeIncIndex(AggregatorFactory[] metrics)
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(metrics)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ private IncrementalIndex makeIncIndex()
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(aggs)
.setDeserializeComplexMetrics(false)
.setReportParseExceptions(false)
.setMaxRowCount(MAX_ROWS)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ private IncrementalIndex makeIncIndex()
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ private IncrementalIndex makeIncIndex()
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment * 2)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,6 @@ private IncrementalIndex makeIncIndex()
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ private IncrementalIndex makeIncIndex()
.withRollup(rollup)
.build()
)
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ private IncrementalIndex makeIncIndex(boolean withRollup)
.withRollup(withRollup)
.build()
)
.setReportParseExceptions(false)
.setConcurrentEventAdd(true)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,6 @@ private IncrementalIndex makeIncIndex()
{
return new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(schemaInfo.getAggsArray())
.setReportParseExceptions(false)
.setMaxRowCount(rowsPerSegment)
.buildOnheap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,7 +69,7 @@ public static InputRow parse(InputRowSchema inputRowSchema, Map<String, Object>
return parse(inputRowSchema.getTimestampSpec(), inputRowSchema.getDimensionsSpec(), theMap);
}

public static InputRow parse(
private static InputRow parse(
TimestampSpec timestampSpec,
DimensionsSpec dimensionsSpec,
Map<String, Object> theMap
Expand All @@ -76,7 +78,8 @@ public static InputRow parse(
return parse(timestampSpec, dimensionsSpec.getDimensionNames(), dimensionsSpec.getDimensionExclusions(), theMap);
}

public static InputRow parse(
@VisibleForTesting
static InputRow parse(
TimestampSpec timestampSpec,
List<String> dimensions,
Set<String> dimensionExclusions,
Expand All @@ -93,23 +96,42 @@ public static InputRow parse(
final DateTime timestamp;
try {
timestamp = timestampSpec.extractTimestamp(theMap);
if (timestamp == null) {
final String input = theMap.toString();
throw new NullPointerException(
StringUtils.format(
"Null timestamp in input: %s",
input.length() < 100 ? input : input.substring(0, 100) + "..."
)
);
}
}
catch (Exception e) {
throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
throw new ParseException(
e,
"Timestamp[%s] is unparseable! Event: %s",
timestampSpec.getRawTimestamp(theMap),
rawMapToPrint(theMap)
);
}
if (timestamp == null) {
throw new ParseException(
"Timestamp[%s] is unparseable! Event: %s",
timestampSpec.getRawTimestamp(theMap),
rawMapToPrint(theMap)
);
}
if (!Intervals.ETERNITY.contains(timestamp)) {
Comment thread
suneet-s marked this conversation as resolved.
throw new ParseException(
"Encountered row with timestamp[%s] that cannot be represented as a long: [%s]",
timestamp,
rawMapToPrint(theMap)
);
Comment thread
suneet-s marked this conversation as resolved.
}

return new MapBasedInputRow(timestamp, dimensionsToUse, theMap);
}

@Nullable
private static String rawMapToPrint(@Nullable Map<String, Object> rawMap)
{
if (rawMap == null) {
return null;
}
final String input = rawMap.toString();
return input.length() < 100 ? input : input.substring(0, 100) + "...";
}

@JsonProperty
@Override
public ParseSpec getParseSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,20 @@ public DateTime getMissingValue()
return missingValue;
}

public DateTime extractTimestamp(Map<String, Object> input)
@Nullable
public DateTime extractTimestamp(@Nullable Map<String, Object> input)
{
return parseDateTime(input.get(timestampColumn));
return parseDateTime(getRawTimestamp(input));
}

public DateTime parseDateTime(Object input)
@Nullable
public Object getRawTimestamp(@Nullable Map<String, Object> input)
{
return input == null ? null : input.get(timestampColumn);
}

@Nullable
public DateTime parseDateTime(@Nullable Object input)
{
DateTime extracted = missingValue;
if (input != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,42 @@
import org.apache.druid.java.util.common.StringUtils;

/**
* ParseException can be thrown on both ingestion side and query side.
*
* During ingestion, ParseException can be thrown in two places, i.e., {@code InputSourceReader#read()}
* and {@code IncrementalIndex#addToFacts()}. To easily handle ParseExceptions, consider using
* {@code FilteringCloseableInputRowIterator} and {@code ParseExceptionHandler} to iterate input rows and
* to add rows to IncrementalIndex, respectively.
*
* When you use {@code InputSourceReader#sample()}, the ParseException will not be thrown, but be stored in
* {@code InputRowListPlusRawValues}.
*
* During query, ParseException can be thrown in SQL planning. It should be never thrown once a query plan is
* constructed.
*/
public class ParseException extends RuntimeException
{
private boolean fromPartiallyValidRow = false;
private final boolean fromPartiallyValidRow;

public ParseException(String formatText, Object... arguments)
{
super(StringUtils.nonStrictFormat(formatText, arguments));
this.fromPartiallyValidRow = false;
}

public ParseException(Throwable cause, String formatText, Object... arguments)
public ParseException(boolean fromPartiallyValidRow, String formatText, Object... arguments)
{
super(StringUtils.nonStrictFormat(formatText, arguments), cause);
super(StringUtils.nonStrictFormat(formatText, arguments));
this.fromPartiallyValidRow = fromPartiallyValidRow;
}

public boolean isFromPartiallyValidRow()
public ParseException(Throwable cause, String formatText, Object... arguments)
{
return fromPartiallyValidRow;
this(false, StringUtils.nonStrictFormat(formatText, arguments), cause);
}

public void setFromPartiallyValidRow(boolean fromPartiallyValidRow)
public boolean isFromPartiallyValidRow()
{
this.fromPartiallyValidRow = fromPartiallyValidRow;
return fromPartiallyValidRow;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.data.input.impl;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.List;
import java.util.Set;

public class MapInputRowParserTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();

private final TimestampSpec timestampSpec = new TimestampSpec("time", null, null);
private final List<String> dimensions = ImmutableList.of("dim");
private final Set<String> dimensionExclusions = ImmutableSet.of();

@Test
public void testParseValidInput()
{
final InputRow inputRow = MapInputRowParser.parse(
timestampSpec,
dimensions,
dimensionExclusions,
ImmutableMap.of("time", "2020-01-01", "dim", 0, "met", 10)
);
Assert.assertEquals(dimensions, inputRow.getDimensions());
Assert.assertEquals(DateTimes.of("2020-01-01"), inputRow.getTimestamp());
Assert.assertEquals(ImmutableList.of("0"), inputRow.getDimension("dim"));
Assert.assertEquals(10, inputRow.getMetric("met"));
}

@Test
public void testParseInvalidTimestampThrowParseException()
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Timestamp[invalid timestamp] is unparseable!");
final InputRow inputRow = MapInputRowParser.parse(
timestampSpec,
dimensions,
dimensionExclusions,
ImmutableMap.of("time", "invalid timestamp", "dim", 0, "met", 10)
);
}

@Test
public void testParseMissingTimestampThrowParseException()
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Timestamp[null] is unparseable!");
final InputRow inputRow = MapInputRowParser.parse(
timestampSpec,
dimensions,
dimensionExclusions,
ImmutableMap.of("dim", 0, "met", 10)
);
}

@Test
public void testParseTimestampSmallerThanMinThrowParseException()
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Encountered row with timestamp[-146136543-09-08T08:23:32.095Z] that cannot be represented as a long");
MapInputRowParser.parse(
timestampSpec,
dimensions,
dimensionExclusions,
ImmutableMap.of("time", DateTimes.utc(JodaUtils.MIN_INSTANT - 1), "dim", 0, "met", 10)
);
}

@Test
public void testParseTimestampLargerThanMaxThrowParseException()
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Encountered row with timestamp[146140482-04-24T15:36:27.904Z] that cannot be represented as a long");
MapInputRowParser.parse(
timestampSpec,
dimensions,
dimensionExclusions,
ImmutableMap.of("time", DateTimes.utc(JodaUtils.MAX_INSTANT + 1), "dim", 0, "met", 10)
);
}
}
Loading