Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import io.druid.data.input.InputRow;
import io.druid.data.input.MapBasedInputRow;
import io.druid.java.util.common.parsers.ParseException;

import org.joda.time.DateTime;

import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ private Map<Interval, List<ShardSpec>> determineShardSpecs(
// determine intervals containing data and prime HLL collectors
final Map<Interval, Optional<HyperLogLogCollector>> hllCollectors = Maps.newHashMap();
int thrownAway = 0;
int unparseable = 0;

log.info("Determining intervals and shardSpecs");
long determineShardSpecsStartMillis = System.currentTimeMillis();
Expand All @@ -269,48 +270,61 @@ private Map<Interval, List<ShardSpec>> determineShardSpecs(
firehoseTempDir)
) {
while (firehose.hasMore()) {
final InputRow inputRow = firehose.nextRow();
try {
final InputRow inputRow = firehose.nextRow();

// The null inputRow means the caller must skip this row.
if (inputRow == null) {
continue;
}
// The null inputRow means the caller must skip this row.
if (inputRow == null) {
continue;
}

final Interval interval;
if (determineIntervals) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
thrownAway++;
final Interval interval;
if (determineIntervals) {
interval = granularitySpec.getSegmentGranularity().bucket(inputRow.getTimestamp());
} else {
final Optional<Interval> optInterval = granularitySpec.bucketInterval(inputRow.getTimestamp());
if (!optInterval.isPresent()) {
thrownAway++;
continue;
}
interval = optInterval.get();
}

if (!determineNumPartitions) {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.<HyperLogLogCollector>absent());
}
continue;
}
interval = optInterval.get();
}

if (!determineNumPartitions) {
// we don't need to determine partitions but we still need to determine intervals, so add an Optional.absent()
// for the interval and don't instantiate a HLL collector
if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.<HyperLogLogCollector>absent());
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
}
continue;
}

if (!hllCollectors.containsKey(interval)) {
hllCollectors.put(interval, Optional.of(HyperLogLogCollector.makeLatestCollector()));
List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
}
catch (ParseException e) {
if (ingestionSchema.getTuningConfig().isReportParseExceptions()) {
throw e;
} else {
unparseable++;
}
}

List<Object> groupKey = Rows.toGroupKey(
queryGranularity.bucketStart(inputRow.getTimestamp()).getMillis(),
inputRow
);
hllCollectors.get(interval).get().add(hashFunction.hashBytes(jsonMapper.writeValueAsBytes(groupKey)).asBytes());
}
}

// These metrics are reported in generateAndPublishSegments()
if (thrownAway > 0) {
log.warn("Unable to to find a matching interval for [%,d] events", thrownAway);
log.warn("Unable to find a matching interval for [%,d] events", thrownAway);
}
if (unparseable > 0) {
log.warn("Unable to parse [%,d] events", unparseable);
}

final ImmutableSortedMap<Interval, Optional<HyperLogLogCollector>> sortedMap = ImmutableSortedMap.copyOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.IndexTask.IndexIngestionSpec;
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
Expand All @@ -62,6 +64,7 @@
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

import java.io.BufferedWriter;
Expand All @@ -79,6 +82,9 @@ public class IndexTaskTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Rule
public ExpectedException expectedException = ExpectedException.none();

private static final ParseSpec DEFAULT_PARSE_SPEC = new CSVParseSpec(
new TimestampSpec(
"ts",
Expand Down Expand Up @@ -446,7 +452,6 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception
writer.write("2014-01-01T00:00:10Z,a,1\n");
}


IndexTask indexTask = new IndexTask(
null,
null,
Expand Down Expand Up @@ -487,6 +492,115 @@ public void testCSVFileWithHeaderColumnOverride() throws Exception
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}

@Test
public void testIgnoreParseException() throws Exception
{
final File tmpDir = temporaryFolder.newFolder();

final File tmpFile = File.createTempFile("druid", "index", tmpDir);

try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}

// GranularitySpec.intervals and numShards must be null to verify reportParseException=false is respected both in
// IndexTask.determineShardSpecs() and IndexTask.generateAndPublishSegments()
final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("time", "dim", "val"),
true,
0
),
null,
2,
null,
false,
false,
false // ignore parse exception
);

IndexTask indexTask = new IndexTask(
null,
null,
parseExceptionIgnoreSpec,
null,
jsonMapper
);

final List<DataSegment> segments = runTask(indexTask);

Assert.assertEquals(Arrays.asList("d"), segments.get(0).getDimensions());
Assert.assertEquals(Arrays.asList("val"), segments.get(0).getMetrics());
Assert.assertEquals(new Interval("2014/P1D"), segments.get(0).getInterval());
}

@Test
public void testReportParseException() throws Exception
{
expectedException.expect(ParseException.class);
expectedException.expectMessage("Unparseable timestamp found!");

final File tmpDir = temporaryFolder.newFolder();

final File tmpFile = File.createTempFile("druid", "index", tmpDir);

try (BufferedWriter writer = Files.newWriter(tmpFile, StandardCharsets.UTF_8)) {
writer.write("time,d,val\n");
writer.write("unparseable,a,1\n");
writer.write("2014-01-01T00:00:10Z,a,1\n");
}

final IndexIngestionSpec parseExceptionIgnoreSpec = createIngestionSpec(
tmpDir,
new CSVParseSpec(
new TimestampSpec(
"time",
"auto",
null
),
new DimensionsSpec(
null,
Lists.<String>newArrayList(),
Lists.<SpatialDimensionSchema>newArrayList()
),
null,
Arrays.asList("time", "dim", "val"),
true,
0
),
null,
2,
null,
false,
false,
true // report parse exception
);

IndexTask indexTask = new IndexTask(
null,
null,
parseExceptionIgnoreSpec,
null,
jsonMapper
);

runTask(indexTask);
}

private final List<DataSegment> runTask(final IndexTask indexTask) throws Exception
{
final List<DataSegment> segments = Lists.newArrayList();
Expand Down Expand Up @@ -577,6 +691,29 @@ private IndexTask.IndexIngestionSpec createIngestionSpec(
boolean forceExtendableShardSpecs,
boolean appendToExisting
)
{
return createIngestionSpec(
baseDir,
parseSpec,
granularitySpec,
targetPartitionSize,
numShards,
forceExtendableShardSpecs,
appendToExisting,
true
);
}

private IndexTask.IndexIngestionSpec createIngestionSpec(
File baseDir,
ParseSpec parseSpec,
GranularitySpec granularitySpec,
Integer targetPartitionSize,
Integer numShards,
boolean forceExtendableShardSpecs,
boolean appendToExisting,
boolean reportParseException
)
{
return new IndexTask.IndexIngestionSpec(
new DataSchema(
Expand Down Expand Up @@ -614,7 +751,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec(
null,
true,
forceExtendableShardSpecs,
true,
reportParseException,
null
)
);
Expand Down