diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d4fee282bde5..5082a8060cb4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -65,6 +65,7 @@ import io.druid.indexing.kafka.supervisor.KafkaSupervisor; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.collect.Utils; import io.druid.java.util.common.concurrent.Execs; @@ -2015,6 +2016,19 @@ private boolean withinMinMaxRecordTime(final InputRow row) final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); + if (!Intervals.ETERNITY.contains(row.getTimestamp())) { + final String errorMsg = StringUtils.format( + "Encountered row with timestamp that cannot be represented as a long: [%s]", + row + ); + log.debug(errorMsg); + if (tuningConfig.isReportParseExceptions()) { + throw new ParseException(errorMsg); + } else { + return false; + } + } + if (log.isDebugEnabled()) { if (beforeMinimumMessageTime) { log.debug( diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 03d9f95cbb78..34fcc15f2ff6 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -240,6 +240,7 @@ private static List> generateRecords(String topic new ProducerRecord(topic, 0, null, JB("2010", "c", "y", 1.0f)), new ProducerRecord(topic, 0, null, JB("2011", "d", "y", 1.0f)), new ProducerRecord(topic, 0, null, JB("2011", "e", "y", 1.0f)), + new ProducerRecord(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)), new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable")), new ProducerRecord(topic, 0, null, null), new ProducerRecord(topic, 0, null, JB("2013", "f", "y", 1.0f)), @@ -455,7 +456,7 @@ public void testIncrementalHandOff() throws Exception // of events fetched across two partitions from Kafka final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 0L)); final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 4L, 1, 2L)); - final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L)); + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L)); final KafkaIndexTask task = createTask( null, new KafkaIOConfig( @@ -493,7 +494,7 @@ public void testIncrementalHandOff() throws Exception // Check metrics Assert.assertEquals(8, task.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); @@ -505,7 +506,7 @@ public void testIncrementalHandOff() throws Exception SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -969,7 +970,7 @@ public void testRunConflicting() throws Exception new KafkaIOConfig( "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 8L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), true, false, @@ -1000,7 +1001,7 @@ public void testRunConflicting() throws Exception Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata, should all be from the first task SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1038,7 +1039,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaIOConfig( "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 8L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), false, false, @@ -1075,7 +1076,7 @@ public void testRunConflictingWithoutTransactions() throws Exception Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1); @@ -1448,7 +1449,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); // try again but with resume flag == true - newEndOffsets = ImmutableMap.of(0, 6L); + newEndOffsets = ImmutableMap.of(0, 7L); task.setEndOffsets(newEndOffsets, true, true); Assert.assertEquals(newEndOffsets, task.getEndOffsets()); Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); @@ -1467,7 +1468,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception // Check metrics Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); @@ -1475,7 +1476,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) );