Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.collect.Utils;
import io.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -2015,6 +2016,19 @@ private boolean withinMinMaxRecordTime(final InputRow row)
final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent()
&& ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp());

if (!Intervals.ETERNITY.contains(row.getTimestamp())) {
final String errorMsg = StringUtils.format(
"Encountered row with timestamp that cannot be represented as a long: [%s]",
row
);
log.debug(errorMsg);
if (tuningConfig.isReportParseExceptions()) {
throw new ParseException(errorMsg);
} else {
return false;
}
}

if (log.isDebugEnabled()) {
if (beforeMinimumMessageTime) {
log.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ private static List<ProducerRecord<byte[], byte[]>> generateRecords(String topic
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2010", "c", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "d", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2011", "e", "y", 1.0f)),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)),
new ProducerRecord<byte[], byte[]>(topic, 0, null, StringUtils.toUtf8("unparseable")),
new ProducerRecord<byte[], byte[]>(topic, 0, null, null),
new ProducerRecord<byte[], byte[]>(topic, 0, null, JB("2013", "f", "y", 1.0f)),
Expand Down Expand Up @@ -455,7 +456,7 @@ public void testIncrementalHandOff() throws Exception
// of events fetched across two partitions from Kafka
final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 0L));
final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 4L, 1, 2L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L));
final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L));
final KafkaIndexTask task = createTask(
null,
new KafkaIOConfig(
Expand Down Expand Up @@ -493,7 +494,7 @@ public void testIncrementalHandOff() throws Exception
// Check metrics
Assert.assertEquals(8, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway());

// Check published metadata
SegmentDescriptor desc1 = SD(task, "2008/P1D", 0);
Expand All @@ -505,7 +506,7 @@ public void testIncrementalHandOff() throws Exception
SegmentDescriptor desc7 = SD(task, "2013/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

Expand Down Expand Up @@ -969,7 +970,7 @@ public void testRunConflicting() throws Exception
new KafkaIOConfig(
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 8L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
kafkaServer.consumerProperties(),
true,
false,
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public void testRunConflicting() throws Exception
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway());

// Check published segments & metadata, should all be from the first task
SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0);
Expand Down Expand Up @@ -1038,7 +1039,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
new KafkaIOConfig(
"sequence1",
new KafkaPartitions(topic, ImmutableMap.of(0, 3L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 8L)),
new KafkaPartitions(topic, ImmutableMap.of(0, 9L)),
kafkaServer.consumerProperties(),
false,
false,
Expand Down Expand Up @@ -1075,7 +1076,7 @@ public void testRunConflictingWithoutTransactions() throws Exception
Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway());

// Check published segments & metadata
SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1);
Expand Down Expand Up @@ -1448,7 +1449,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());

// try again but with resume flag == true
newEndOffsets = ImmutableMap.of(0, 6L);
newEndOffsets = ImmutableMap.of(0, 7L);
task.setEndOffsets(newEndOffsets, true, true);
Assert.assertEquals(newEndOffsets, task.getEndOffsets());
Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus());
Expand All @@ -1467,15 +1468,15 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception
// Check metrics
Assert.assertEquals(4, task.getFireDepartmentMetrics().processed());
Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable());
Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway());
Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway());

// Check published metadata
SegmentDescriptor desc1 = SD(task, "2009/P1D", 0);
SegmentDescriptor desc2 = SD(task, "2010/P1D", 0);
SegmentDescriptor desc3 = SD(task, "2011/P1D", 0);
Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors());
Assert.assertEquals(
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))),
new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))),
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

Expand Down