From ce44791c69d034b963fc9891623100b12bf9556d Mon Sep 17 00:00:00 2001 From: Jonathan Wei Date: Mon, 8 Jan 2018 15:40:50 -0800 Subject: [PATCH] Throw away rows with timestamps beyond long bounds in kafka indexing (#5215) * Throw away rows with timestamps beyond long bounds in kafka indexing * PR comments --- .../druid/indexing/kafka/KafkaIndexTask.java | 14 +++++++++++++ .../indexing/kafka/KafkaIndexTaskTest.java | 21 ++++++++++--------- 2 files changed, 25 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d4fee282bde5..5082a8060cb4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -65,6 +65,7 @@ import io.druid.indexing.kafka.supervisor.KafkaSupervisor; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.collect.Utils; import io.druid.java.util.common.concurrent.Execs; @@ -2015,6 +2016,19 @@ private boolean withinMinMaxRecordTime(final InputRow row) final boolean afterMaximumMessageTime = ioConfig.getMaximumMessageTime().isPresent() && ioConfig.getMaximumMessageTime().get().isBefore(row.getTimestamp()); + if (!Intervals.ETERNITY.contains(row.getTimestamp())) { + final String errorMsg = StringUtils.format( + "Encountered row with timestamp that cannot be represented as a long: [%s]", + row + ); + log.debug(errorMsg); + if (tuningConfig.isReportParseExceptions()) { + throw new ParseException(errorMsg); + } else { + return false; + } + } + if (log.isDebugEnabled()) { if (beforeMinimumMessageTime) { log.debug( diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 298d1490f882..c2e1700df8a3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -241,6 +241,7 @@ private static List> generateRecords(String topic new ProducerRecord(topic, 0, null, JB("2010", "c", "y", 1.0f)), new ProducerRecord(topic, 0, null, JB("2011", "d", "y", 1.0f)), new ProducerRecord(topic, 0, null, JB("2011", "e", "y", 1.0f)), + new ProducerRecord(topic, 0, null, JB("246140482-04-24T15:36:27.903Z", "x", "z", 1.0f)), new ProducerRecord(topic, 0, null, StringUtils.toUtf8("unparseable")), new ProducerRecord(topic, 0, null, null), new ProducerRecord(topic, 0, null, JB("2013", "f", "y", 1.0f)), @@ -456,7 +457,7 @@ public void testIncrementalHandOff() throws Exception // of events fetched across two partitions from Kafka final KafkaPartitions checkpoint1 = new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 0L)); final KafkaPartitions checkpoint2 = new KafkaPartitions(topic, ImmutableMap.of(0, 4L, 1, 2L)); - final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L)); + final KafkaPartitions endPartitions = new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L)); final KafkaIndexTask task = createTask( null, new KafkaIOConfig( @@ -494,7 +495,7 @@ public void testIncrementalHandOff() throws Exception // Check metrics Assert.assertEquals(8, task.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task.getFireDepartmentMetrics().thrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2008/P1D", 0); @@ -506,7 +507,7 @@ public void testIncrementalHandOff() throws Exception SegmentDescriptor desc7 = SD(task, "2013/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3, desc4, desc5, desc6, desc7), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 8L, 1, 2L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 9L, 1, 2L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) ); @@ -970,7 +971,7 @@ public void testRunConflicting() throws Exception new KafkaIOConfig( "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 8L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), true, false, @@ -1001,7 +1002,7 @@ public void testRunConflicting() throws Exception Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata, should all be from the first task SegmentDescriptor desc1 = SD(task1, "2010/P1D", 0); @@ -1039,7 +1040,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaIOConfig( "sequence1", new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 8L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), false, false, @@ -1076,7 +1077,7 @@ public void testRunConflictingWithoutTransactions() throws Exception Assert.assertEquals(0, task1.getFireDepartmentMetrics().thrownAway()); Assert.assertEquals(3, task2.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task2.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task2.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(2, task2.getFireDepartmentMetrics().thrownAway()); // Check published segments & metadata SegmentDescriptor desc3 = SD(task2, "2011/P1D", 1); @@ -1449,7 +1450,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); // try again but with resume flag == true - newEndOffsets = ImmutableMap.of(0, 6L); + newEndOffsets = ImmutableMap.of(0, 7L); task.setEndOffsets(newEndOffsets, true, true); Assert.assertEquals(newEndOffsets, task.getEndOffsets()); Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); @@ -1468,7 +1469,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception // Check metrics Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); // Check published metadata SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); @@ -1476,7 +1477,7 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 6L))), + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))), metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) );