diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index b344b8a3e9b5..6261154e7058 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -279,6 +279,7 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ) ); + boolean publishedSuccessfully = false; try ( final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); @@ -516,6 +517,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) if (published == null) { throw new ISE("Transaction failure publishing segments, aborting"); } else { + publishedSuccessfully = true; log.info( "Published segments[%s] with metadata[%s].", Joiner.on(", ").join( @@ -548,6 +550,11 @@ public String apply(DataSegment input) throw e; } + if (!publishedSuccessfully){ + log.error("The task did NOT publish the segments successfully, check the value of [completionTimeout]"); + throw e; + } + log.info("The task was asked to stop before completing"); } finally {