From 62ba861993f4e2caf7d68ccc6eedfbd2ef331413 Mon Sep 17 00:00:00 2001 From: Parag Jain Date: Mon, 8 Jan 2018 20:01:36 -0600 Subject: [PATCH] Fix state check bug in Kafka Index Task (#5204) * fix state check for replacement task * fix comments * rebase with master --- .../druid/indexing/kafka/KafkaIndexTask.java | 5 +- .../kafka/supervisor/KafkaSupervisor.java | 2 +- .../indexing/kafka/KafkaIndexTaskTest.java | 116 ++++++++++++++++++ .../overlord/supervisor/Supervisor.java | 10 +- 4 files changed, 125 insertions(+), 8 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 5082a8060cb4..4a2ecbbae10c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -512,13 +512,14 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception final Object restoredMetadata = driver.startJob(); if (restoredMetadata == null) { // no persist has happened so far + // so either this is a brand new task or replacement of a failed task Preconditions.checkState(sequences.get(0).startOffsets.entrySet().stream().allMatch( partitionOffsetEntry -> Longs.compare( partitionOffsetEntry.getValue(), ioConfig.getStartPartitions() .getPartitionOffsetMap() .get(partitionOffsetEntry.getKey()) - ) == 0 + ) >= 0 ), "Sequence offsets are not compatible with start offsets of task"); nextOffsets.putAll(sequences.get(0).startOffsets); } else { @@ -545,7 +546,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ioConfig.getStartPartitions().getPartitionOffsetMap().keySet() ); } - // sequences size can 0 only when all sequences got published and task stopped before it could finish + // sequences size can be 0 only when all sequences got published and task stopped before it could finish // which is super rare if (sequences.size() == 0 || sequences.get(sequences.size() - 1).isCheckpointed()) { this.endOffsets.putAll(sequences.size() == 0 diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d0b8ea55c241..2a36f251d0a0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -670,7 +670,7 @@ public void handle() throws ExecutionException, InterruptedException, TimeoutExc // as when the task starts they are sent existing checkpoints Preconditions.checkState( checkpoints.size() <= 1, - "Got checkpoint request with null as previous check point, however found more than one checkpoints in metadata store" + "Got checkpoint request with null as previous check point, however found more than one checkpoints" ); if (checkpoints.size() == 1) { log.info("Already checkpointed with dataSourceMetadata [%s]", checkpoints.get(0)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index c2e1700df8a3..40986a95aab3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -155,10 +155,12 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -1557,6 +1559,72 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva } } + @Test(timeout = 60_000L) + public void testRunContextSequenceAheadOfStartingOffsets() throws Exception + { + // This tests the case when a replacement task is created in place of a failed test + // which has done some incremental handoffs, thus the context will contain starting + // sequence offsets from which the task should start reading and ignore the start offsets + if (!isIncrementalHandoffSupported) { + return; + } + // Insert data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : records) { + kafkaProducer.send(record).get(); + } + } + + final TreeMap> sequences = new TreeMap<>(); + // Here the sequence number is 1 meaning that one incremental handoff was done by the failed task + // and this task should start reading from offset 2 for partition 0 + sequences.put(1, ImmutableMap.of(0, 2L)); + final Map context = new HashMap<>(); + context.put("checkpoints", objectMapper.writerWithType(new TypeReference>>() + { + }).writeValueAsString(sequences)); + + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + // task should ignore these and use sequence info sent in the context + new KafkaPartitions(topic, ImmutableMap.of(0, 0L)), + new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true, + false, + null, + null, + false + ), + context + ); + + final ListenableFuture future = runTask(task); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); + } + private ListenableFuture runTask(final Task task) { try { @@ -1615,6 +1683,15 @@ private KafkaIndexTask createTask( return createTask(taskId, DATA_SCHEMA, ioConfig); } + private KafkaIndexTask createTask( + final String taskId, + final KafkaIOConfig ioConfig, + final Map context + ) + { + return createTask(taskId, DATA_SCHEMA, ioConfig, context); + } + private KafkaIndexTask createTask( final String taskId, final DataSchema dataSchema, @@ -1651,6 +1728,45 @@ private KafkaIndexTask createTask( return task; } + + private KafkaIndexTask createTask( + final String taskId, + final DataSchema dataSchema, + final KafkaIOConfig ioConfig, + final Map context + ) + { + final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( + 1000, + maxRowsPerSegment, + new Period("P1Y"), + null, + null, + null, + true, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + null + ); + if (isIncrementalHandoffSupported) { + context.put(KafkaSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); + } + + final KafkaIndexTask task = new KafkaIndexTask( + taskId, + null, + cloneDataSchema(dataSchema), + tuningConfig, + ioConfig, + context, + null, + null + ); + task.setPollRetryMs(POLL_RETRY_MS); + return task; + } + private static DataSchema cloneDataSchema(final DataSchema dataSchema) { return new DataSchema( diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index c6c411480683..681421b48002 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -40,14 +40,14 @@ public interface Supervisor void reset(DataSourceMetadata dataSourceMetadata); /** - * The definition of checkpoint is not very strict as currently it does not affect data or control path + * The definition of checkpoint is not very strict as currently it does not affect data or control path. * On this call Supervisor can potentially checkpoint data processed so far to some durable storage * for example - Kafka Supervisor uses this to merge and handoff segments containing at least the data - * represented by dataSourceMetadata + * represented by {@param currentCheckpoint} DataSourceMetadata * - * @param sequenceName unique Identifier to figure out for which sequence to do check pointing - * @param previousCheckPoint DataSourceMetadata check pointed in previous call - * @param currentCheckPoint current DataSourceMetadata to be check pointed + * @param sequenceName unique Identifier to figure out for which sequence to do checkpointing + * @param previousCheckPoint DataSourceMetadata checkpointed in previous call + * @param currentCheckPoint current DataSourceMetadata to be checkpointed */ void checkpoint( @Nullable String sequenceName,