From 7aabe7736d0d8cd4c71fd0e6c73b931021df38b8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 22 Jun 2018 13:43:36 -0700 Subject: [PATCH 1/4] Fix broken Appenderator contract in KafkaIndexTask --- ...ementalPublishingKafkaIndexTaskRunner.java | 452 +++++++++--------- .../druid/indexing/kafka/KafkaIOConfig.java | 11 - .../druid/indexing/kafka/KafkaIndexTask.java | 6 - .../indexing/kafka/KafkaIndexTaskClient.java | 27 +- .../indexing/kafka/KafkaIndexTaskRunner.java | 3 +- .../kafka/LegacyKafkaIndexTaskRunner.java | 114 ++--- .../kafka/supervisor/KafkaSupervisor.java | 5 +- .../indexing/kafka/KafkaIOConfigTest.java | 14 +- .../kafka/KafkaIndexTaskClientTest.java | 85 +--- .../indexing/kafka/KafkaIndexTaskTest.java | 129 +---- .../kafka/supervisor/KafkaSupervisorTest.java | 9 +- .../indexing/common/task/AbstractTask.java | 2 +- .../io/druid/indexing/common/task/Task.java | 1 + .../indexing/common/task/IndexTaskTest.java | 2 +- .../appenderator/SegmentsAndMetadata.java | 3 +- .../StreamAppenderatorDriver.java | 3 +- 16 files changed, 295 insertions(+), 571 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index df2f768662d5..44d5efd4a086 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -29,15 +30,13 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; @@ -63,7 +62,6 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.collect.Utils; -import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; import io.druid.segment.indexing.RealtimeIOConfig; @@ -102,6 +100,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -109,16 +108,13 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -133,7 +129,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; - private final Map endOffsets = new ConcurrentHashMap<>(); + private final Map endOffsets; private final Map nextOffsets = new ConcurrentHashMap<>(); private final Map lastPersistedOffsets = new ConcurrentHashMap<>(); @@ -183,10 +179,7 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private final RowIngestionMeters rowIngestionMeters; private final Set publishingSequences = Sets.newConcurrentHashSet(); - private final BlockingQueue publishQueue = new LinkedBlockingQueue<>(); - private final List> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue - private final CountDownLatch waitForPublishes = new CountDownLatch(1); - private final AtomicReference throwableAtomicReference = new AtomicReference<>(); + private final List> handOffWaitList = new ArrayList<>(); private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -198,12 +191,10 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private volatile IngestionState ingestionState; private volatile boolean pauseRequested = false; - private volatile long pauseMillis = 0; private volatile long nextCheckpointTime; private volatile CopyOnWriteArrayList sequences; - private volatile File sequencesPersistFile; - private ListeningExecutorService publishExecService; + private volatile Throwable backgroundThreadException; public IncrementalPublishingKafkaIndexTaskRunner( KafkaIndexTask task, @@ -224,7 +215,7 @@ public IncrementalPublishingKafkaIndexTaskRunner( this.topic = ioConfig.getStartPartitions().getTopic(); this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters(); - this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap()); this.sequences = new CopyOnWriteArrayList<>(); this.ingestionState = IngestionState.NOT_STARTED; @@ -256,49 +247,39 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception status = Status.STARTING; this.toolbox = toolbox; - final Map context = task.getContext(); - if (context != null && context.get("checkpoints") != null) { - final String checkpointsString = (String) context.get("checkpoints"); - log.info("Got checkpoints [%s]", checkpointsString); - final TreeMap> checkpoints = toolbox.getObjectMapper().readValue( - checkpointsString, - new TypeReference>>() - { - } - ); - - Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); - Map.Entry> previous = sequenceOffsets.next(); - while (sequenceOffsets.hasNext()) { - Map.Entry> current = sequenceOffsets.next(); + if (!restoreSequences()) { + final TreeMap> checkpoints = getCheckPointsFromContext(toolbox, task); + if (checkpoints != null) { + Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); + Map.Entry> previous = sequenceOffsets.next(); + while (sequenceOffsets.hasNext()) { + Map.Entry> current = sequenceOffsets.next(); + sequences.add(new SequenceMetadata( + previous.getKey(), + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), + previous.getValue(), + current.getValue(), + true + )); + previous = current; + } sequences.add(new SequenceMetadata( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), - current.getValue(), - true + endOffsets, + false + )); + } else { + sequences.add(new SequenceMetadata( + 0, + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), + ioConfig.getStartPartitions().getPartitionOffsetMap(), + endOffsets, + false )); - previous = current; } - sequences.add(new SequenceMetadata( - previous.getKey(), - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), - previous.getValue(), - endOffsets, - false - )); - } else { - sequences.add(new SequenceMetadata( - 0, - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), - ioConfig.getStartPartitions().getPartitionOffsetMap(), - endOffsets, - false - )); } - - sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json"); - restoreSequences(); log.info("Starting with sequences: %s", sequences); if (chatHandlerProvider.isPresent()) { @@ -333,15 +314,12 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception ) ); - try ( - final KafkaConsumer consumer = task.newConsumer() - ) { + try (final KafkaConsumer consumer = task.newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); appenderator = task.newAppenderator(fireDepartmentMetrics, toolbox); driver = task.newDriver(appenderator, toolbox, fireDepartmentMetrics); - createAndStartPublishExecutor(); final String topic = ioConfig.getStartPartitions().getTopic(); @@ -433,7 +411,7 @@ public void run() status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -446,8 +424,7 @@ public void run() } // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed() - && !ioConfig.isPauseAfterRead())) { + if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; } @@ -455,12 +432,23 @@ public void run() break; } - checkAndMaybeThrowException(); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); + } + + // Check if any handoffFuture failed. + final List> handoffFinished = handOffWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); - if (!ioConfig.isPauseAfterRead()) { - maybePersistAndPublishSequences(committerSupplier); + for (ListenableFuture handoffFuture : handoffFinished) { + // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + handoffFuture.get(); } + maybePersistAndPublishSequences(committerSupplier); + // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to // offset is not present in the topic-partition. This can happen if we're asking a task to read from data // that has not been written yet (which is totally legitimate). So let's wait for it to show up. @@ -471,19 +459,17 @@ public void run() catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } SequenceMetadata sequenceToCheckpoint = null; for (ConsumerRecord record : records) { - if (log.isTraceEnabled()) { - log.trace( - "Got topic[%s] partition[%d] offset[%,d].", - record.topic(), - record.partition(), - record.offset() - ); - } + log.trace( + "Got topic[%s] partition[%d] offset[%,d].", + record.topic(), + record.partition(), + record.offset() + ); if (record.offset() < endOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) { @@ -513,12 +499,11 @@ public void run() for (InputRow row : rows) { if (row != null && task.withinMinMaxRecordTime(row)) { - SequenceMetadata sequenceToUse = null; - for (SequenceMetadata sequence : sequences) { - if (sequence.canHandle(record)) { - sequenceToUse = sequence; - } - } + final SequenceMetadata sequenceToUse = sequences + .stream() + .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) + .findFirst() + .orElse(null); if (sequenceToUse == null) { throw new ISE( @@ -586,7 +571,7 @@ public void onSuccess(@Nullable Object result) public void onFailure(Throwable t) { log.error("Persist failed, dying"); - throwableAtomicReference.set(t); + backgroundThreadException = t; } } ); @@ -603,7 +588,7 @@ public void onFailure(Throwable t) && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } @@ -620,7 +605,7 @@ public void onFailure(Throwable t) sequenceToCheckpoint, sequences ); - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( task.getDataSource(), ioConfig.getBaseSequenceName(), @@ -634,6 +619,7 @@ public void onFailure(Throwable t) ingestionState = IngestionState.COMPLETED; } catch (Exception e) { + // (1) catch all exceptions while reading from kafka log.error(e, "Encountered exception in run() before persisting."); throw e; } @@ -657,16 +643,19 @@ public void onFailure(Throwable t) sequenceMetadata.updateAssignments(nextOffsets); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } } - // add Sentinel SequenceMetadata to indicate end of all sequences - publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata()); - waitForPublishes.await(); - checkAndMaybeThrowException(); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); + } - List handedOffList = Lists.newArrayList(); + // Wait for handoff futures to complete. + // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding + // handoffFuture. As a result, waiting for handoff futures includes waiting for publishing tasks to complete, too. + // See publishAndRegisterHandoff() for details. + List handedOffList = Collections.emptyList(); if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { @@ -690,8 +679,13 @@ public void onFailure(Throwable t) Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } + + appenderator.close(); } catch (InterruptedException | RejectedExecutionException e) { + // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including + // the final publishing. + Futures.allAsList(handOffWaitList).cancel(true); appenderator.closeNow(); // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException @@ -707,14 +701,12 @@ public void onFailure(Throwable t) log.info("The task was asked to stop before completing"); } + catch (Exception e) { + // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. + appenderator.closeNow(); + throw e; + } finally { - if (appenderator != null) { - if (throwableAtomicReference.get() != null) { - appenderator.closeNow(); - } else { - appenderator.close(); - } - } if (driver != null) { driver.close(); } @@ -722,10 +714,6 @@ public void onFailure(Throwable t) chatHandlerProvider.get().unregister(task.getId()); } - if (publishExecService != null) { - publishExecService.shutdownNow(); - } - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } @@ -734,83 +722,110 @@ public void onFailure(Throwable t) return TaskStatus.success(task.getId()); } - private void createAndStartPublishExecutor() + private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) { - publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver")); - publishExecService.submit( - (Runnable) () -> { - while (true) { - try { - final SequenceMetadata sequenceMetadata = publishQueue.take(); - - Preconditions.checkNotNull(driver); + log.info("Publishing segments for sequence [%s]", sequenceMetadata); - if (sequenceMetadata.isSentinel()) { - waitForPublishes.countDown(); - break; - } + final ListenableFuture publishFuture = driver.publish( + sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), + sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), + ImmutableList.of(sequenceMetadata.getSequenceName()) + ); - log.info("Publishing segments for sequence [%s]", sequenceMetadata); - - final SegmentsAndMetadata result = driver.publish( - sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()), - sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), - ImmutableList.of(sequenceMetadata.getSequenceName()) - ).get(); - - if (result == null) { - throw new ISE( - "Transaction failure publishing segments for sequence [%s]", - sequenceMetadata - ); - } else { - log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) - ), - Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata") - ); - } + // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails. + final SettableFuture handoffFuture = SettableFuture.create(); + handOffWaitList.add(handoffFuture); - sequences.remove(sequenceMetadata); - publishingSequences.remove(sequenceMetadata.getSequenceName()); - try { - persistSequences(); - } - catch (IOException e) { - log.error(e, "Unable to persist state, dying"); - Throwables.propagate(e); - } + Futures.addCallback( + publishFuture, + new FutureCallback() + { + @Override + public void onSuccess(@Nullable SegmentsAndMetadata publishedSegmentsAndMetadata) + { + if (publishedSegmentsAndMetadata == null) { + final RuntimeException e = new ISE( + "Transaction failure publishing segments for sequence [%s]", + sequenceMetadata + ); + handoffFuture.setException(e); + throw e; + } else { + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()), + Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata") + ); + } - final ListenableFuture handOffFuture = driver.registerHandoff(result); - handOffWaitList.add(handOffFuture); + sequences.remove(sequenceMetadata); + publishingSequences.remove(sequenceMetadata.getSequenceName()); + try { + persistSequences(); } - catch (Throwable t) { - if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException - && t.getCause() instanceof InterruptedException))) { - log.warn("Stopping publish thread as we are interrupted, probably we are shutting down"); - } else { - log.makeAlert(t, "Error in publish thread, dying").emit(); - throwableAtomicReference.set(t); - } - Futures.allAsList(handOffWaitList).cancel(true); - waitForPublishes.countDown(); - break; + catch (IOException e) { + log.error(e, "Unable to persist state, dying"); + handoffFuture.setException(e); + throw new RuntimeException(e); } + + Futures.transform( + driver.registerHandoff(publishedSegmentsAndMetadata), + new Function() + { + @Nullable + @Override + public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) + { + if (handoffSegmentsAndMetadata == null) { + log.warn( + "Failed to handoff segments[%s]", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()) + ); + } + handoffFuture.set(handoffSegmentsAndMetadata); + return null; + } + } + ); + } + + @Override + public void onFailure(Throwable t) + { + log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); + handoffFuture.setException(t); } } ); } - private void restoreSequences() throws IOException + private static File getSequencesPersistFile(TaskToolbox toolbox) { - Preconditions.checkNotNull(sequencesPersistFile); + return new File(toolbox.getPersistDir(), "sequences.json"); + } + + private boolean restoreSequences() throws IOException + { + final File sequencesPersistFile = getSequencesPersistFile(toolbox); if (sequencesPersistFile.exists()) { - sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().>readValue( - sequencesPersistFile, new TypeReference>() - { - })); + sequences = new CopyOnWriteArrayList<>( + toolbox.getObjectMapper().>readValue( + sequencesPersistFile, + new TypeReference>() + { + } + ) + ); + return true; + } else { + return false; } } @@ -821,7 +836,7 @@ private synchronized void persistSequences() throws IOException new TypeReference>() { } - ).writeValue(sequencesPersistFile, sequences); + ).writeValue(getSequencesPersistFile(toolbox), sequences); } private Map getTaskCompletionReports(@Nullable String errorMsg) @@ -875,7 +890,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli result, sequenceMetadata ); - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } catch (InterruptedException e) { log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata); @@ -916,58 +931,25 @@ private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, Strin return assignment; } - private void checkAndMaybeThrowException() - { - if (throwableAtomicReference.get() != null) { - Throwables.propagate(throwableAtomicReference.get()); - } - } - /** - * Checks if the pauseRequested flag was set and if so blocks: - * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared - * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared - *

- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the - * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume - * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal - * shouldResume after adjusting pauseMillis for the new value to take effect. + * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. *

* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. *

- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. * * @return true if a pause request was handled, false otherwise */ - private boolean possiblyPause(Set assignment) throws InterruptedException + private boolean possiblyPause() throws InterruptedException { pauseLock.lockInterruptibly(); try { - if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { - pauseMillis = KafkaIndexTask.PAUSE_FOREVER; - pauseRequested = true; - } - if (pauseRequested) { status = Status.PAUSED; - long nanos = 0; hasPaused.signalAll(); while (pauseRequested) { - if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } else { - if (pauseMillis > 0) { - log.info("Pausing ingestion for [%,d] ms", pauseMillis); - nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); - pauseMillis = 0; - } - if (nanos <= 0L) { - pauseRequested = false; // timeout elapsed - } - nanos = shouldResume.awaitNanos(nanos); - } + log.info("Pausing ingestion until resumed"); + shouldResume.await(); } status = Status.READING; @@ -1060,9 +1042,8 @@ private boolean isPaused() return status == Status.PAUSED; } - private void requestPause(long pauseMillis) + private void requestPause() { - this.pauseMillis = pauseMillis; pauseRequested = true; } @@ -1088,7 +1069,7 @@ private void sendResetRequestAndWait(Map outOfRangePartiti .addData("partitions", partitionOffsetMap.keySet()) .emit(); // wait for being killed by supervisor - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); } else { log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); } @@ -1223,14 +1204,13 @@ public Map getEndOffsets() @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsetsHTTP( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume, @QueryParam("finish") @DefaultValue("true") final boolean finish, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets, resume, finish); + return setEndOffsets(offsets, finish); } @GET @@ -1274,7 +1254,6 @@ public Response getUnparseableEvents( @Override public Response setEndOffsets( Map offsets, - final boolean resume, final boolean finish // this field is only for internal purposes, shouldn't be usually set by users ) throws InterruptedException { @@ -1303,7 +1282,7 @@ public Response setEndOffsets( (latestSequence.getEndOffsets().equals(offsets) && finish)) { log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences); return Response.ok(offsets).build(); - } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) { + } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST) .entity(StringUtils.format( "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", @@ -1337,7 +1316,6 @@ public Response setEndOffsets( log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets); endOffsets.putAll(offsets); } else { - Preconditions.checkState(!ioConfig.isPauseAfterRead()); // create new sequence final SequenceMetadata newSequence = new SequenceMetadata( latestSequence.getSequenceId() + 1, @@ -1353,17 +1331,19 @@ public Response setEndOffsets( } catch (Exception e) { log.error(e, "Unable to set end offsets, dying"); - throwableAtomicReference.set(e); - Throwables.propagate(e); + backgroundThreadException = e; + // should resume to immediately finish kafka index task as failed + resume(); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(Throwables.getStackTraceAsString(e)) + .build(); } finally { pauseLock.unlock(); } } - if (resume) { - resume(); - } + resume(); return Response.ok(offsets).build(); } @@ -1394,8 +1374,6 @@ private Map> getCheckpoints() /** * Signals the ingestion loop to pause. * - * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely - * * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets * in the response body if the task successfully paused @@ -1404,16 +1382,15 @@ private Map> getCheckpoints() @Path("/pause") @Produces(MediaType.APPLICATION_JSON) public Response pauseHTTP( - @QueryParam("timeout") @DefaultValue("0") final long timeout, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return pause(timeout); + return pause(); } @Override - public Response pause(final long timeout) throws InterruptedException + public Response pause() throws InterruptedException { if (!(status == Status.PAUSED || status == Status.READING)) { return Response.status(Response.Status.BAD_REQUEST) @@ -1423,7 +1400,6 @@ public Response pause(final long timeout) throws InterruptedException pauseLock.lockInterruptibly(); try { - pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout; pauseRequested = true; pollRetryLock.lockInterruptibly(); @@ -1596,22 +1572,6 @@ boolean canHandle(ConsumerRecord record) && record.offset() < endOffsets.get(record.partition()); } - private SequenceMetadata() - { - this.sequenceId = -1; - this.sequenceName = null; - this.startOffsets = null; - this.endOffsets = null; - this.assignments = null; - this.checkpointed = true; - this.sentinel = true; - } - - static SequenceMetadata getSentinelSequenceMetadata() - { - return new SequenceMetadata(); - } - @Override public String toString() { @@ -1667,7 +1627,7 @@ public void run() }; } - TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction) + TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction) { return (segments, commitMetadata) -> { final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( @@ -1702,4 +1662,24 @@ TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTrans }; } } + + @Nullable + private static TreeMap> getCheckPointsFromContext( + TaskToolbox toolbox, + KafkaIndexTask task + ) throws IOException + { + final String checkpointsString = task.getContextValue("checkpoints"); + if (checkpointsString != null) { + log.info("Checkpoints [%s]", checkpointsString); + return toolbox.getObjectMapper().readValue( + checkpointsString, + new TypeReference>>() + { + } + ); + } else { + return null; + } + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index d9a7fb20ee82..5d48fe19405b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -31,7 +31,6 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; - private static final boolean DEFAULT_PAUSE_AFTER_READ = false; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; private final String baseSequenceName; @@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig private final KafkaPartitions endPartitions; private final Map consumerProperties; private final boolean useTransaction; - private final boolean pauseAfterRead; private final Optional minimumMessageTime; private final Optional maximumMessageTime; private final boolean skipOffsetGaps; @@ -51,7 +49,6 @@ public KafkaIOConfig( @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, - @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps @@ -62,7 +59,6 @@ public KafkaIOConfig( this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; - this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; @@ -117,12 +113,6 @@ public boolean isUseTransaction() return useTransaction; } - @JsonProperty - public boolean isPauseAfterRead() - { - return pauseAfterRead; - } - @JsonProperty public Optional getMaximumMessageTime() { @@ -150,7 +140,6 @@ public String toString() ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + ", useTransaction=" + useTransaction + - ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + ", maximumMessageTime=" + maximumMessageTime + ", skipOffsetGaps=" + skipOffsetGaps + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index da92f4ffc764..ee31b6aa4b83 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -68,8 +68,6 @@ public class KafkaIndexTask extends AbstractTask implements ChatHandler { - public static final long PAUSE_FOREVER = -1L; - public enum Status { NOT_STARTED, @@ -86,14 +84,11 @@ public enum Status private static final Random RANDOM = new Random(); static final long POLL_TIMEOUT = 100; static final long LOCK_ACQUIRE_TIMEOUT_SECONDS = 15; - private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; - private static final String METADATA_PUBLISH_PARTITIONS = "publishPartitions"; private final DataSchema dataSchema; private final InputRowParser parser; private final KafkaTuningConfig tuningConfig; private final KafkaIOConfig ioConfig; - private final AuthorizerMapper authorizerMapper; private final Optional chatHandlerProvider; private final KafkaIndexTaskRunner runner; @@ -126,7 +121,6 @@ public KafkaIndexTask( this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); - this.authorizerMapper = authorizerMapper; final CircularBuffer savedParseExceptions; if (tuningConfig.getMaxSavedParseExceptions() > 0) { savedParseExceptions = new CircularBuffer<>(tuningConfig.getMaxSavedParseExceptions()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 3d0886a566c0..26643f8be129 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -28,11 +28,11 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.indexer.TaskLocation; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskInfoProvider; -import io.druid.indexer.TaskStatus; import io.druid.java.util.common.IAE; import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; @@ -167,19 +167,14 @@ public boolean resume(final String id) public Map pause(final String id) { - return pause(id, 0); - } - - public Map pause(final String id, final long timeout) - { - log.debug("Pause task[%s] timeout[%d]", id, timeout); + log.debug("Pause task[%s]", id); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "pause", - timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null, + null, true ); @@ -361,18 +356,17 @@ public Map getEndOffsets(final String id) public boolean setEndOffsets( final String id, final Map endOffsets, - final boolean resume, final boolean finalize ) { - log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize); + log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "offsets/end", - StringUtils.format("resume=%s&finish=%s", resume, finalize), + StringUtils.format("finish=%s", finalize), jsonMapper.writeValueAsBytes(endOffsets), true ); @@ -415,11 +409,6 @@ public Boolean call() } public ListenableFuture> pauseAsync(final String id) - { - return pauseAsync(id, 0); - } - - public ListenableFuture> pauseAsync(final String id, final long timeout) { return executorService.submit( new Callable>() @@ -427,7 +416,7 @@ public ListenableFuture> pauseAsync(final String id, final lo @Override public Map call() { - return pause(id, timeout); + return pause(id); } } ); @@ -490,7 +479,7 @@ public Map call() } public ListenableFuture setEndOffsetsAsync( - final String id, final Map endOffsets, final boolean resume, final boolean finalize + final String id, final Map endOffsets, final boolean finalize ) { return executorService.submit( @@ -499,7 +488,7 @@ public ListenableFuture setEndOffsetsAsync( @Override public Boolean call() { - return setEndOffsets(id, endOffsets, resume, finalize); + return setEndOffsets(id, endOffsets, finalize); } } ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java index 4f84510008ea..e0535373dd38 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskRunner.java @@ -60,12 +60,11 @@ public interface KafkaIndexTaskRunner extends ChatHandler @VisibleForTesting Response setEndOffsets( Map offsets, - boolean resume, boolean finish // this field is only for internal purposes, shouldn't be usually set by users ) throws InterruptedException; @VisibleForTesting - Response pause(long timeout) throws InterruptedException; + Response pause() throws InterruptedException; @VisibleForTesting void resume() throws InterruptedException; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java index a32c1ec30dae..f057b0585176 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java @@ -20,14 +20,11 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Function; -import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import io.druid.data.input.Committer; @@ -81,12 +78,10 @@ import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.POST; import javax.ws.rs.Path; import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -102,6 +97,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -174,7 +170,6 @@ public class LegacyKafkaIndexTaskRunner implements KafkaIndexTaskRunner private volatile FireDepartmentMetrics fireDepartmentMetrics; private volatile IngestionState ingestionState; - private volatile long pauseMillis = 0; private volatile boolean pauseRequested; LegacyKafkaIndexTaskRunner( @@ -355,7 +350,7 @@ public void run() status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -381,7 +376,7 @@ public void run() catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } for (ConsumerRecord record : records) { @@ -476,7 +471,7 @@ public void run() && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); KafkaIndexTask.assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } } @@ -534,32 +529,38 @@ public void run() sequenceNames.values() ).get(); + final List publishedSegments = published.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()); + + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegments, + Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata") + ); + final Future handoffFuture = driver.registerHandoff(published); - final SegmentsAndMetadata handedOff; + SegmentsAndMetadata handedOff = null; if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOff = handoffFuture.get(); } else { - handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + try { + handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) + .addData("TaskId", task.getId()) + .emit(); + } } if (handedOff == null) { - throw new ISE("Transaction failure publishing segments, aborting"); + log.warn("Failed to handoff segments[%s]", publishedSegments); } else { log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - Iterables.transform( - handedOff.getSegments(), - new Function() - { - @Override - public String apply(DataSegment input) - { - return input.getIdentifier(); - } - } - ) - ), + "Handoff completed for segments[%s] with metadata[%s]", + handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()), Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } @@ -627,50 +628,24 @@ private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, Strin } /** - * Checks if the pauseRequested flag was set and if so blocks: - * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared - * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared - *

- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the - * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume - * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal - * shouldResume after adjusting pauseMillis for the new value to take effect. + * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. *

* Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. *

- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. * * @return true if a pause request was handled, false otherwise */ - private boolean possiblyPause(Set assignment) throws InterruptedException + private boolean possiblyPause() throws InterruptedException { pauseLock.lockInterruptibly(); try { - if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { - pauseMillis = KafkaIndexTask.PAUSE_FOREVER; - pauseRequested = true; - } - if (pauseRequested) { status = Status.PAUSED; - long nanos = 0; hasPaused.signalAll(); while (pauseRequested) { - if (pauseMillis == KafkaIndexTask.PAUSE_FOREVER) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } else { - if (pauseMillis > 0) { - log.info("Pausing ingestion for [%,d] ms", pauseMillis); - nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); - pauseMillis = 0; - } - if (nanos <= 0L) { - pauseRequested = false; // timeout elapsed - } - nanos = shouldResume.awaitNanos(nanos); - } + log.info("Pausing ingestion until resumed"); + shouldResume.await(); } status = Status.READING; @@ -752,15 +727,14 @@ private void sendResetRequestAndWait(Map outOfRangePartiti .addData("partitions", partitionOffsetMap.keySet()) .emit(); // wait for being killed by supervisor - requestPause(KafkaIndexTask.PAUSE_FOREVER); + requestPause(); } else { log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); } } - private void requestPause(long pauseMillis) + private void requestPause() { - this.pauseMillis = pauseMillis; pauseRequested = true; } @@ -941,10 +915,10 @@ public Map getEndOffsets() } @Override - public Response setEndOffsets(Map offsets, boolean resume, boolean finish) throws InterruptedException + public Response setEndOffsets(Map offsets, boolean finish) throws InterruptedException { // finish is not used in this mode - return setEndOffsets(offsets, resume); + return setEndOffsets(offsets); } @POST @@ -953,12 +927,11 @@ public Response setEndOffsets(Map offsets, boolean resume, boolea @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsetsHTTP( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets, resume); + return setEndOffsets(offsets); } @GET @@ -1000,8 +973,7 @@ public Response getUnparseableEvents( } public Response setEndOffsets( - Map offsets, - final boolean resume + Map offsets ) throws InterruptedException { if (offsets == null) { @@ -1048,9 +1020,7 @@ public Response setEndOffsets( pauseLock.unlock(); } - if (resume) { - resume(); - } + resume(); return Response.ok(endOffsets).build(); } @@ -1063,8 +1033,6 @@ private boolean isPaused() /** * Signals the ingestion loop to pause. * - * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely - * * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets * in the response body if the task successfully paused @@ -1073,16 +1041,15 @@ private boolean isPaused() @Path("/pause") @Produces(MediaType.APPLICATION_JSON) public Response pauseHTTP( - @QueryParam("timeout") @DefaultValue("0") final long timeout, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return pause(timeout); + return pause(); } @Override - public Response pause(final long timeout) throws InterruptedException + public Response pause() throws InterruptedException { if (!(status == Status.PAUSED || status == Status.READING)) { return Response.status(Response.Status.BAD_REQUEST) @@ -1092,7 +1059,6 @@ public Response pause(final long timeout) throws InterruptedException pauseLock.lockInterruptibly(); try { - pauseMillis = timeout <= 0 ? KafkaIndexTask.PAUSE_FOREVER : timeout; pauseRequested = true; pollRetryLock.lockInterruptibly(); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 3ef27a85d6c4..97a7a38d6191 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -43,8 +43,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import io.druid.indexer.TaskLocation; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.stats.RowIngestionMetersFactory; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -1534,7 +1534,7 @@ public Map apply(List> input) log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); for (final String taskId : setEndOffsetTaskIds) { - setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize)); + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } List results = Futures.successfulAsList(setEndOffsetFutures) @@ -1781,7 +1781,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, true, - false, minimumMessageTime, maximumMessageTime, ioConfig.isSkipOffsetGaps() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 49a9b90033d9..22a792f7e6f2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -71,8 +71,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(true, config.isUseTransaction()); - Assert.assertEquals(false, config.isPauseAfterRead()); + Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -88,7 +87,6 @@ public void testSerdeWithNonDefaults() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" + " \"skipOffsetGaps\": true\n" @@ -109,8 +107,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(false, config.isUseTransaction()); - Assert.assertEquals(true, config.isPauseAfterRead()); + Assert.assertFalse(config.isUseTransaction()); Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -125,7 +122,6 @@ public void testBaseSequenceNameRequired() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -145,7 +141,6 @@ public void testStartPartitionsRequired() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -165,7 +160,6 @@ public void testEndPartitionsRequired() throws Exception + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -185,7 +179,6 @@ public void testConsumerPropertiesRequired() throws Exception + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -206,7 +199,6 @@ public void testStartAndEndTopicMatch() throws Exception + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -227,7 +219,6 @@ public void testStartAndEndPartitionSetMatch() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -248,7 +239,6 @@ public void testEndOffsetGreaterThanStart() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 75c43a099715..c8d18db4a1c4 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -28,8 +28,8 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.druid.indexer.TaskLocation; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.IAE; @@ -57,6 +57,7 @@ import org.junit.runners.Parameterized; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -143,13 +144,13 @@ public void testNoTaskLocation() Assert.assertEquals(false, client.stop(TEST_ID, true)); Assert.assertEquals(false, client.resume(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); - Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10)); + Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID)); Assert.assertEquals(null, client.getStartTime(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true)); Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), false, true)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), true, true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); verifyAll(); } @@ -447,33 +448,6 @@ public void testPause() throws Exception Assert.assertEquals(10, (long) results.get(1)); } - @Test - public void testPauseWithTimeout() throws Exception - { - Capture captured = Capture.newInstance(); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2); - expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replayAll(); - - Map results = client.pause(TEST_ID, 101); - verifyAll(); - - Request request = captured.getValue(); - Assert.assertEquals(HttpMethod.POST, request.getMethod()); - Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"), - request.getUrl() - ); - Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); - - Assert.assertEquals(2, results.size()); - Assert.assertEquals(1, (long) results.get(0)); - Assert.assertEquals(10, (long) results.get(1)); - } - @Test public void testPauseWithSubsequentGetOffsets() throws Exception { @@ -560,13 +534,13 @@ public void testSetEndOffsets() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, false, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -585,13 +559,13 @@ public void testSetEndOffsetsAndResume() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, true, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -739,39 +713,6 @@ public void testPauseAsync() throws Exception } } - @Test - public void testPauseAsyncWithTimeout() throws Exception - { - final int numRequests = TEST_IDS.size(); - Capture captured = Capture.newInstance(CaptureType.ALL); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); - expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ).times(numRequests); - replayAll(); - - List expectedUrls = Lists.newArrayList(); - List>> futures = Lists.newArrayList(); - for (String testId : TEST_IDS) { - expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9"))); - futures.add(client.pauseAsync(testId, 9)); - } - - List> responses = Futures.allAsList(futures).get(); - - verifyAll(); - List requests = captured.getValues(); - - Assert.assertEquals(numRequests, requests.size()); - Assert.assertEquals(numRequests, responses.size()); - for (int i = 0; i < numRequests; i++) { - Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod()); - Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl())); - Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i)); - } - } - @Test public void testGetStatusAsync() throws Exception { @@ -925,9 +866,9 @@ public void testSetEndOffsetsAsync() throws Exception TEST_HOST, TEST_PORT, testId, - StringUtils.format("offsets/end?resume=%s&finish=%s", false, true) + StringUtils.format("offsets/end?finish=%s", true) ))); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); @@ -966,11 +907,11 @@ public void testSetEndOffsetsAsyncWithResume() throws Exception TEST_HOST, TEST_PORT, testId, - "offsets/end?resume=true&finish=true" + "offsets/end?finish=true" ) ) ); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 17c2a145f439..efe9d53c2bf7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -156,6 +156,7 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; +import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; @@ -382,7 +383,6 @@ public void testRunAfterDataInserted() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -424,7 +424,6 @@ public void testRunBeforeDataInserted() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -500,7 +499,6 @@ public void testIncrementalHandOff() throws Exception endPartitions, consumerProps, true, - false, null, null, false @@ -513,7 +511,7 @@ public void testIncrementalHandOff() throws Exception final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap() .equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, true, false); + task.getRunner().setEndOffsets(currentOffsets, false); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); @@ -589,7 +587,6 @@ public void testTimeBasedIncrementalHandOff() throws Exception endPartitions, consumerProps, true, - false, null, null, false @@ -603,7 +600,7 @@ public void testTimeBasedIncrementalHandOff() throws Exception } final Map currentOffsets = ImmutableMap.copyOf(task.getRunner().getCurrentOffsets()); Assert.assertTrue(checkpoint.getPartitionOffsetMap().equals(currentOffsets)); - task.getRunner().setEndOffsets(currentOffsets, true, false); + task.getRunner().setEndOffsets(currentOffsets, false); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); @@ -646,7 +643,6 @@ public void testRunWithMinimumMessageTime() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, DateTimes.of("2010"), null, false @@ -700,7 +696,6 @@ public void testRunWithMaximumMessageTime() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, DateTimes.of("2010"), false @@ -764,7 +759,6 @@ public void testRunWithTransformSpec() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -824,7 +818,6 @@ public void testRunOnNothing() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -865,7 +858,6 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -917,7 +909,6 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -927,10 +918,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio final ListenableFuture future = runTask(task); // Wait for task to exit - Assert.assertEquals( - isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED, - future.get().getStatusCode() - ); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics Assert.assertEquals(3, task.getRunner().getRowIngestionMeters().getProcessed()); @@ -975,7 +963,6 @@ public void testReportParseExceptions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1019,7 +1006,6 @@ public void testMultipleParseExceptionsSuccess() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 13L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1101,7 +1087,6 @@ public void testMultipleParseExceptionsFailure() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1161,7 +1146,6 @@ public void testRunReplicas() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1175,7 +1159,6 @@ public void testRunReplicas() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1229,7 +1212,6 @@ public void testRunConflicting() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1243,7 +1225,6 @@ public void testRunConflicting() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1298,7 +1279,6 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1312,7 +1292,6 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 10L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1372,7 +1351,6 @@ public void testRunOneTaskTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1437,7 +1415,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1451,7 +1428,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1507,7 +1483,6 @@ public void testRestore() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1544,7 +1519,6 @@ public void testRestore() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1596,7 +1570,6 @@ public void testRunWithPauseAndResume() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1621,7 +1594,7 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(KafkaIndexTask.Status.READING, task.getRunner().getStatus()); Map currentOffsets = objectMapper.readValue( - task.getRunner().pause(0).getEntity().toString(), + task.getRunner().pause().getEntity().toString(), new TypeReference>() { } @@ -1669,93 +1642,6 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 60_000L) - public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception - { - final KafkaIndexTask task = createTask( - null, - new KafkaIOConfig( - "sequence0", - new KafkaPartitions(topic, ImmutableMap.of(0, 1L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - kafkaServer.consumerProperties(), - true, - true, - null, - null, - false - ) - ); - - final ListenableFuture future = runTask(task); - - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the assigned offsets and paused instead of publishing - Assert.assertEquals(task.getRunner().getEndOffsets(), task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - Assert.assertEquals(ImmutableMap.of(0, 3L), task.getRunner().getEndOffsets()); - Map newEndOffsets = ImmutableMap.of(0, 4L); - task.getRunner().setEndOffsets(newEndOffsets, false, true); - Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - task.getRunner().resume(); - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the updated offsets and paused - Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - // try again but with resume flag == true - newEndOffsets = ImmutableMap.of(0, 7L); - task.getRunner().setEndOffsets(newEndOffsets, true, true); - Assert.assertEquals(newEndOffsets, task.getRunner().getEndOffsets()); - Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - while (task.getRunner().getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - Assert.assertEquals(newEndOffsets, task.getRunner().getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getRunner().getStatus()); - - task.getRunner().resume(); - - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - // Check metrics - Assert.assertEquals(4, task.getRunner().getRowIngestionMeters().getProcessed()); - Assert.assertEquals(2, task.getRunner().getRowIngestionMeters().getUnparseable()); - Assert.assertEquals(0, task.getRunner().getRowIngestionMeters().getThrownAway()); - - // Check published metadata - SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); - SegmentDescriptor desc2 = SD(task, "2010/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); - Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) - ); - - // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3)); - } - @Test(timeout = 30_000L) public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception { @@ -1767,7 +1653,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1780,7 +1665,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception Thread.sleep(2000); } - task.getRunner().pause(0); + task.getRunner().pause(); while (!task.getRunner().getStatus().equals(KafkaIndexTask.Status.PAUSED)) { Thread.sleep(25); @@ -1806,7 +1691,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1860,7 +1744,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 04861d0c6df3..236f00424fc3 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -263,7 +263,6 @@ public void testNoInitialState() throws Exception Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); @@ -1055,7 +1054,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFuture(true)).times(2); @@ -1083,7 +1081,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig(); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -1171,7 +1168,6 @@ public void testDiscoverExistingPublishingTask() throws Exception Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1260,7 +1256,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1573,7 +1568,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); @@ -1698,7 +1692,7 @@ public void testStopGracefully() throws Exception expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); - expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true)) + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3"); expectLastCall().times(2); @@ -2049,7 +2043,6 @@ private KafkaIndexTask createKafkaIndexTask( endPartitions, ImmutableMap.of(), true, - false, minimumMessageTime, maximumMessageTime, false diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index 992b130cf883..22898a5f5cd6 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -24,8 +24,8 @@ import com.google.common.base.Joiner; import com.google.common.base.Objects; import com.google.common.base.Preconditions; -import io.druid.indexing.common.TaskLock; import io.druid.indexer.TaskStatus; +import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.actions.LockListAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.java.util.common.DateTimes; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 9e1763c8a501..416ab8593975 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -180,6 +180,7 @@ default int getPriority() */ TaskStatus run(TaskToolbox toolbox) throws Exception; + @Nullable Map getContext(); @Nullable diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 746315cfb25d..f9f354071048 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -37,12 +37,12 @@ import io.druid.data.input.impl.StringInputRowParser; import io.druid.data.input.impl.TimestampSpec; import io.druid.indexer.TaskState; +import io.druid.indexer.TaskStatus; import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.TaskReport; import io.druid.indexing.common.TaskReportFileWriter; -import io.druid.indexer.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.actions.LockAcquireAction; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java index 72a7f8b325c4..74a1011abc57 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java @@ -23,12 +23,13 @@ import io.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; public class SegmentsAndMetadata { - private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.of(), null); + private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null); private final Object commitMetadata; private final ImmutableList segments; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index ddf5593169f4..29a154747e96 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -232,7 +231,7 @@ public Object persist(final Committer committer) throws InterruptedException throw e; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } From f719d68b231077df155273bdfe3126cbc3cce254 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 Jun 2018 16:17:17 -0700 Subject: [PATCH 2/4] fix build --- .../kafka/IncrementalPublishingKafkaIndexTaskRunner.java | 3 +-- .../test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 44d5efd4a086..21c14524a913 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -28,7 +28,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -729,7 +728,7 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) final ListenableFuture publishFuture = driver.publish( sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), - ImmutableList.of(sequenceMetadata.getSequenceName()) + Collections.singletonList(sequenceMetadata.getSequenceName()) ); // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails. diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index efe9d53c2bf7..f76788d4984b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -156,7 +156,6 @@ import org.junit.runners.Parameterized; import javax.annotation.Nullable; -import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; From 32911a7894a875c1fd6fdd800bf4dffaf3d5ebd5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 26 Jun 2018 16:43:18 -0700 Subject: [PATCH 3/4] add publishFuture --- ...ementalPublishingKafkaIndexTaskRunner.java | 105 ++++++++++++------ 1 file changed, 70 insertions(+), 35 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 21c14524a913..2d627dd13694 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -99,9 +99,9 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -109,6 +109,7 @@ import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; @@ -178,7 +179,8 @@ public class IncrementalPublishingKafkaIndexTaskRunner implements KafkaIndexTask private final RowIngestionMeters rowIngestionMeters; private final Set publishingSequences = Sets.newConcurrentHashSet(); - private final List> handOffWaitList = new ArrayList<>(); + private final List> publishWaitList = new LinkedList<>(); + private final List> handOffWaitList = new LinkedList<>(); private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) @@ -435,16 +437,7 @@ public void run() throw new RuntimeException(backgroundThreadException); } - // Check if any handoffFuture failed. - final List> handoffFinished = handOffWaitList - .stream() - .filter(Future::isDone) - .collect(Collectors.toList()); - - for (ListenableFuture handoffFuture : handoffFinished) { - // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). - handoffFuture.get(); - } + checkPublishAndHandoffFailure(); maybePersistAndPublishSequences(committerSupplier); @@ -650,9 +643,13 @@ public void onFailure(Throwable t) throw new RuntimeException(backgroundThreadException); } + // Wait for publish futures to complete. + Futures.allAsList(publishWaitList).get(); + // Wait for handoff futures to complete. // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding - // handoffFuture. As a result, waiting for handoff futures includes waiting for publishing tasks to complete, too. + // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it + // failed to persist sequences. It might also return null if handoff failed, but was recoverable. // See publishAndRegisterHandoff() for details. List handedOffList = Collections.emptyList(); if (tuningConfig.getHandoffConditionTimeout() == 0) { @@ -663,6 +660,8 @@ public void onFailure(Throwable t) .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { + // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception + // here. log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) .addData("TaskId", task.getId()) .emit(); @@ -684,6 +683,7 @@ public void onFailure(Throwable t) catch (InterruptedException | RejectedExecutionException e) { // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including // the final publishing. + Futures.allAsList(publishWaitList).cancel(true); Futures.allAsList(handOffWaitList).cancel(true); appenderator.closeNow(); // handle the InterruptedException that gets wrapped in a RejectedExecutionException @@ -702,6 +702,8 @@ public void onFailure(Throwable t) } catch (Exception e) { // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); appenderator.closeNow(); throw e; } @@ -721,15 +723,57 @@ public void onFailure(Throwable t) return TaskStatus.success(task.getId()); } + private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException + { + // Check if any publishFuture failed. + final List> publishFinished = publishWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture publishFuture : publishFinished) { + // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + publishFuture.get(); + } + + publishWaitList.removeAll(publishFinished); + + // Check if any handoffFuture failed. + final List> handoffFinished = handOffWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture handoffFuture : handoffFinished) { + // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + handoffFuture.get(); + } + + handOffWaitList.removeAll(handoffFinished); + } + private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) { log.info("Publishing segments for sequence [%s]", sequenceMetadata); - final ListenableFuture publishFuture = driver.publish( - sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), - sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), - Collections.singletonList(sequenceMetadata.getSequenceName()) + final ListenableFuture publishFuture = Futures.transform( + driver.publish( + sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), + sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), + Collections.singletonList(sequenceMetadata.getSequenceName()) + ), + (Function) publishedSegmentsAndMetadata -> { + if (publishedSegmentsAndMetadata == null) { + throw new ISE( + "Transaction failure publishing segments for sequence [%s]", + sequenceMetadata + ); + } else { + return publishedSegmentsAndMetadata; + } + } ); + publishWaitList.add(publishFuture); // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails. final SettableFuture handoffFuture = SettableFuture.create(); @@ -740,25 +784,16 @@ private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) new FutureCallback() { @Override - public void onSuccess(@Nullable SegmentsAndMetadata publishedSegmentsAndMetadata) + public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata) { - if (publishedSegmentsAndMetadata == null) { - final RuntimeException e = new ISE( - "Transaction failure publishing segments for sequence [%s]", - sequenceMetadata - ); - handoffFuture.setException(e); - throw e; - } else { - log.info( - "Published segments[%s] with metadata[%s].", - publishedSegmentsAndMetadata.getSegments() - .stream() - .map(DataSegment::getIdentifier) - .collect(Collectors.toList()), - Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata") - ); - } + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()), + Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata") + ); sequences.remove(sequenceMetadata); publishingSequences.remove(sequenceMetadata.getSequenceName()); From f337d62fd5fd6245586b3bdb1a41be3d026f633d Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 27 Jun 2018 16:44:52 -0700 Subject: [PATCH 4/4] reuse sequenceToUse if possible --- ...ementalPublishingKafkaIndexTaskRunner.java | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index 2d627dd13694..04f5f811a255 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -489,23 +489,23 @@ public void run() : parser.parseBatch(ByteBuffer.wrap(valueBytes)); boolean isPersistRequired = false; + final SequenceMetadata sequenceToUse = sequences + .stream() + .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) + .findFirst() + .orElse(null); + + if (sequenceToUse == null) { + throw new ISE( + "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", + record.partition(), + record.offset(), + sequences + ); + } + for (InputRow row : rows) { if (row != null && task.withinMinMaxRecordTime(row)) { - final SequenceMetadata sequenceToUse = sequences - .stream() - .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) - .findFirst() - .orElse(null); - - if (sequenceToUse == null) { - throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", - record.partition(), - record.offset(), - sequences - ); - } - final AppenderatorDriverAddResult addResult = driver.add( row, sequenceToUse.getSequenceName(),