diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index d9a7fb20ee82..5d48fe19405b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -31,7 +31,6 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; - private static final boolean DEFAULT_PAUSE_AFTER_READ = false; private static final boolean DEFAULT_SKIP_OFFSET_GAPS = false; private final String baseSequenceName; @@ -39,7 +38,6 @@ public class KafkaIOConfig implements IOConfig private final KafkaPartitions endPartitions; private final Map consumerProperties; private final boolean useTransaction; - private final boolean pauseAfterRead; private final Optional minimumMessageTime; private final Optional maximumMessageTime; private final boolean skipOffsetGaps; @@ -51,7 +49,6 @@ public KafkaIOConfig( @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, - @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("skipOffsetGaps") Boolean skipOffsetGaps @@ -62,7 +59,6 @@ public KafkaIOConfig( this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; - this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.skipOffsetGaps = skipOffsetGaps != null ? skipOffsetGaps : DEFAULT_SKIP_OFFSET_GAPS; @@ -117,12 +113,6 @@ public boolean isUseTransaction() return useTransaction; } - @JsonProperty - public boolean isPauseAfterRead() - { - return pauseAfterRead; - } - @JsonProperty public Optional getMaximumMessageTime() { @@ -150,7 +140,6 @@ public String toString() ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + ", useTransaction=" + useTransaction + - ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + ", maximumMessageTime=" + maximumMessageTime + ", skipOffsetGaps=" + skipOffsetGaps + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 62aca366e485..16a67ec6b06c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -33,7 +33,6 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -42,8 +41,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.InputRowParser; @@ -68,7 +66,6 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.collect.Utils; -import io.druid.java.util.common.concurrent.Execs; import io.druid.java.util.common.guava.Sequence; import io.druid.java.util.common.parsers.ParseException; import io.druid.java.util.emitter.EmittingLogger; @@ -127,23 +124,21 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -179,7 +174,7 @@ public enum Status private final AuthorizerMapper authorizerMapper; private final Optional chatHandlerProvider; - private final Map endOffsets = new ConcurrentHashMap<>(); + private final Map endOffsets; private final Map nextOffsets = new ConcurrentHashMap<>(); private final Map maxEndOffsets = new HashMap<>(); private final Map lastPersistedOffsets = new ConcurrentHashMap<>(); @@ -192,7 +187,6 @@ public enum Status private volatile DateTime startTime; private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) private volatile Thread runThread = null; - private volatile File sequencesPersistFile = null; private final AtomicBoolean stopRequested = new AtomicBoolean(false); private final AtomicBoolean publishOnStop = new AtomicBoolean(false); @@ -232,20 +226,17 @@ public enum Status private final Object statusLock = new Object(); private volatile boolean pauseRequested = false; - private volatile long pauseMillis = 0; // This value can be tuned in some tests private long pollRetryMs = 30000; private final Set publishingSequences = Sets.newConcurrentHashSet(); - private final BlockingQueue publishQueue = new LinkedBlockingQueue<>(); - private final List> handOffWaitList = new CopyOnWriteArrayList<>(); // to prevent concurrency visibility issue - private final CountDownLatch waitForPublishes = new CountDownLatch(1); - private final AtomicReference throwableAtomicReference = new AtomicReference<>(); + private final List> publishWaitList = new LinkedList<>(); + private final List> handOffWaitList = new LinkedList<>(); private final String topic; private volatile CopyOnWriteArrayList sequences; - private ListeningExecutorService publishExecService; + private volatile Throwable backgroundThreadException; private final boolean useLegacy; @JsonCreator @@ -274,7 +265,7 @@ public KafkaIndexTask( this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); this.authorizerMapper = authorizerMapper; - this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); + this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndPartitions().getPartitionOffsetMap()); this.maxEndOffsets.putAll(endOffsets.entrySet() .stream() .collect(Collectors.toMap( @@ -343,75 +334,6 @@ public KafkaIOConfig getIOConfig() return ioConfig; } - private void createAndStartPublishExecutor() - { - publishExecService = MoreExecutors.listeningDecorator(Execs.singleThreaded("publish-driver")); - publishExecService.submit( - (Runnable) () -> { - while (true) { - try { - final SequenceMetadata sequenceMetadata = publishQueue.take(); - - Preconditions.checkNotNull(driver); - - if (sequenceMetadata.isSentinel()) { - waitForPublishes.countDown(); - break; - } - - log.info("Publishing segments for sequence [%s]", sequenceMetadata); - - final SegmentsAndMetadata result = driver.publish( - sequenceMetadata.getPublisher(toolbox, ioConfig.isUseTransaction()), - sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), - ImmutableList.of(sequenceMetadata.getSequenceName()) - ).get(); - - if (result == null) { - throw new ISE( - "Transaction failure publishing segments for sequence [%s]", - sequenceMetadata - ); - } else { - log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - result.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()) - ), - Preconditions.checkNotNull(result.getCommitMetadata(), "commitMetadata") - ); - } - - sequences.remove(sequenceMetadata); - publishingSequences.remove(sequenceMetadata.getSequenceName()); - try { - persistSequences(); - } - catch (IOException e) { - log.error(e, "Unable to persist state, dying"); - Throwables.propagate(e); - } - - final ListenableFuture handOffFuture = driver.registerHandoff(result); - handOffWaitList.add(handOffFuture); - } - catch (Throwable t) { - if ((t instanceof InterruptedException || (t instanceof RejectedExecutionException - && t.getCause() instanceof InterruptedException))) { - log.warn("Stopping publish thread as we are interrupted, probably we are shutting down"); - } else { - log.makeAlert(t, "Error in publish thread, dying").emit(); - throwableAtomicReference.set(t); - } - Futures.allAsList(handOffWaitList).cancel(true); - waitForPublishes.countDown(); - break; - } - } - } - ); - } - @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { @@ -426,47 +348,40 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception status = Status.STARTING; this.toolbox = toolbox; - if (getContext() != null && getContext().get("checkpoints") != null) { - log.info("Got checkpoints [%s]", (String) getContext().get("checkpoints")); - final TreeMap> checkpoints = toolbox.getObjectMapper().readValue( - (String) getContext().get("checkpoints"), - new TypeReference>>() - { - } - ); - - Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); - Map.Entry> previous = sequenceOffsets.next(); - while (sequenceOffsets.hasNext()) { - Map.Entry> current = sequenceOffsets.next(); + if (!restoreSequences()) { + final TreeMap> checkpoints = getCheckPointsFromContext(toolbox, this); + if (checkpoints != null) { + Iterator>> sequenceOffsets = checkpoints.entrySet().iterator(); + Map.Entry> previous = sequenceOffsets.next(); + while (sequenceOffsets.hasNext()) { + Map.Entry> current = sequenceOffsets.next(); + sequences.add(new SequenceMetadata( + previous.getKey(), + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), + previous.getValue(), + current.getValue(), + true + )); + previous = current; + } sequences.add(new SequenceMetadata( previous.getKey(), StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), previous.getValue(), - current.getValue(), - true + maxEndOffsets, + false + )); + } else { + sequences.add(new SequenceMetadata( + 0, + StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), + ioConfig.getStartPartitions().getPartitionOffsetMap(), + maxEndOffsets, + false )); - previous = current; } - sequences.add(new SequenceMetadata( - previous.getKey(), - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), previous.getKey()), - previous.getValue(), - maxEndOffsets, - false - )); - } else { - sequences.add(new SequenceMetadata( - 0, - StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), 0), - ioConfig.getStartPartitions().getPartitionOffsetMap(), - maxEndOffsets, - false - )); - } - sequencesPersistFile = new File(toolbox.getPersistDir(), "sequences.json"); - restoreSequences(); + } log.info("Starting with sequences: %s", sequences); if (chatHandlerProvider.isPresent()) { @@ -504,15 +419,12 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception ) ); - try ( - final KafkaConsumer consumer = newConsumer() - ) { + try (final KafkaConsumer consumer = newConsumer()) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); appenderator = newAppenderator(fireDepartmentMetrics, toolbox); driver = newDriver(appenderator, toolbox, fireDepartmentMetrics); - createAndStartPublishExecutor(); final String topic = ioConfig.getStartPartitions().getTopic(); @@ -602,7 +514,7 @@ public void run() status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -615,8 +527,7 @@ public void run() } // if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true - if (stopRequested.get() || (sequences.get(sequences.size() - 1).isCheckpointed() - && !ioConfig.isPauseAfterRead())) { + if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) { status = Status.PUBLISHING; } @@ -624,11 +535,12 @@ public void run() break; } - checkAndMaybeThrowException(); - - if (!ioConfig.isPauseAfterRead()) { - maybePersistAndPublishSequences(committerSupplier); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); } + checkPublishAndHandoffFailure(); + + maybePersistAndPublishSequences(committerSupplier); // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to // offset is not present in the topic-partition. This can happen if we're asking a task to read from data @@ -640,19 +552,17 @@ public void run() catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } SequenceMetadata sequenceToCheckpoint = null; for (ConsumerRecord record : records) { - if (log.isTraceEnabled()) { - log.trace( - "Got topic[%s] partition[%d] offset[%,d].", - record.topic(), - record.partition(), - record.offset() - ); - } + log.trace( + "Got topic[%s] partition[%d] offset[%,d].", + record.topic(), + record.partition(), + record.offset() + ); if (record.offset() < endOffsets.get(record.partition())) { if (record.offset() != nextOffsets.get(record.partition())) { @@ -680,24 +590,23 @@ public void run() : parser.parseBatch(ByteBuffer.wrap(valueBytes)); boolean isPersistRequired = false; - for (InputRow row : rows) { - if (row != null && withinMinMaxRecordTime(row)) { - SequenceMetadata sequenceToUse = null; - for (SequenceMetadata sequence : sequences) { - if (sequence.canHandle(record)) { - sequenceToUse = sequence; - } - } + final SequenceMetadata sequenceToUse = sequences + .stream() + .filter(sequenceMetadata -> sequenceMetadata.canHandle(record)) + .findFirst() + .orElse(null); - if (sequenceToUse == null) { - throw new ISE( - "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", - record.partition(), - record.offset(), - sequences - ); - } + if (sequenceToUse == null) { + throw new ISE( + "WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s", + record.partition(), + record.offset(), + sequences + ); + } + for (InputRow row : rows) { + if (row != null && withinMinMaxRecordTime(row)) { final AppenderatorDriverAddResult addResult = driver.add( row, sequenceToUse.getSequenceName(), @@ -751,7 +660,7 @@ public void onSuccess(@Nullable Object result) public void onFailure(Throwable t) { log.error("Persist failed, dying"); - throwableAtomicReference.set(t); + backgroundThreadException = t; } } ); @@ -779,11 +688,11 @@ public void onFailure(Throwable t) && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } - if (sequenceToCheckpoint != null && !ioConfig.isPauseAfterRead()) { + if (sequenceToCheckpoint != null && stillReading) { Preconditions.checkArgument( sequences.get(sequences.size() - 1) .getSequenceName() @@ -792,7 +701,7 @@ public void onFailure(Throwable t) sequenceToCheckpoint, sequences ); - requestPause(PAUSE_FOREVER); + requestPause(); if (!toolbox.getTaskActionClient().submit(new CheckPointDataSourceMetadataAction( getDataSource(), ioConfig.getBaseSequenceName(), @@ -804,6 +713,11 @@ public void onFailure(Throwable t) } } } + catch (Exception e) { + // (1) catch all exceptions while reading from kafka + log.error(e, "Encountered exception in run() before persisting."); + throw e; + } finally { log.info("Persisting all pending data"); driver.persist(committerSupplier.get()); // persist pending data @@ -824,16 +738,23 @@ public void onFailure(Throwable t) sequenceMetadata.updateAssignments(nextOffsets); publishingSequences.add(sequenceMetadata.getSequenceName()); // persist already done in finally, so directly add to publishQueue - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } } - // add Sentinel SequenceMetadata to indicate end of all sequences - publishQueue.add(SequenceMetadata.getSentinelSequenceMetadata()); - waitForPublishes.await(); - checkAndMaybeThrowException(); + if (backgroundThreadException != null) { + throw new RuntimeException(backgroundThreadException); + } + + // Wait for publish futures to complete. + Futures.allAsList(publishWaitList).get(); - List handedOffList = Lists.newArrayList(); + // Wait for handoff futures to complete. + // Note that every publishing task (created by calling AppenderatorDriver.publish()) has a corresponding + // handoffFuture. handoffFuture can throw an exception if 1) the corresponding publishFuture failed or 2) it + // failed to persist sequences. It might also return null if handoff failed, but was recoverable. + // See publishAndRegisterHandoff() for details. + List handedOffList = Collections.emptyList(); if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOffList = Futures.allAsList(handOffWaitList).get(); } else { @@ -842,6 +763,8 @@ public void onFailure(Throwable t) .get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); } catch (TimeoutException e) { + // Handoff timeout is not an indexing failure, but coordination failure. We simply ignore timeout exception + // here. log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) .addData("TaskId", this.getId()) .emit(); @@ -861,8 +784,14 @@ public void onFailure(Throwable t) ); } } + + appenderator.close(); } catch (InterruptedException | RejectedExecutionException e) { + // (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including + // the final publishing. + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); appenderator.closeNow(); // handle the InterruptedException that gets wrapped in a RejectedExecutionException if (e instanceof RejectedExecutionException @@ -878,14 +807,14 @@ public void onFailure(Throwable t) log.info("The task was asked to stop before completing"); } + catch (Exception e) { + // (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing. + Futures.allAsList(publishWaitList).cancel(true); + Futures.allAsList(handOffWaitList).cancel(true); + appenderator.closeNow(); + throw e; + } finally { - if (appenderator != null) { - if (throwableAtomicReference.get() != null) { - appenderator.closeNow(); - } else { - appenderator.close(); - } - } if (driver != null) { driver.close(); } @@ -893,10 +822,6 @@ public void onFailure(Throwable t) chatHandlerProvider.get().unregister(getId()); } - if (publishExecService != null) { - publishExecService.shutdownNow(); - } - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); toolbox.getDataSegmentServerAnnouncer().unannounce(); } @@ -947,9 +872,9 @@ private TaskStatus runLegacy(final TaskToolbox toolbox) throws Exception ); try ( - final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); - final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); - final KafkaConsumer consumer = newConsumer() + final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox); + final StreamAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); + final KafkaConsumer consumer = newConsumer() ) { toolbox.getDataSegmentServerAnnouncer().announce(); toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); @@ -1032,7 +957,7 @@ public void run() status = Status.READING; try { while (stillReading) { - if (possiblyPause(assignment)) { + if (possiblyPause()) { // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign // partitions upon resuming. This is safe even if the end offsets have not been modified. assignment = assignPartitionsAndSeekToNext(consumer, topic); @@ -1058,7 +983,7 @@ public void run() catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } for (ConsumerRecord record : records) { @@ -1158,7 +1083,7 @@ public void run() && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); assignPartitions(consumer, topic, assignment); - stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); + stillReading = !assignment.isEmpty(); } } } @@ -1211,32 +1136,37 @@ public void run() sequenceNames.values() ).get(); + final List publishedSegments = published.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()); + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegments, + Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata") + ); + final Future handoffFuture = driver.registerHandoff(published); - final SegmentsAndMetadata handedOff; + SegmentsAndMetadata handedOff = null; if (tuningConfig.getHandoffConditionTimeout() == 0) { handedOff = handoffFuture.get(); } else { - handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + try { + handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); + } + catch (TimeoutException e) { + log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) + .addData("TaskId", getId()) + .emit(); + } } if (handedOff == null) { - throw new ISE("Transaction failure publishing segments, aborting"); + log.warn("Failed to handoff segments[%s]", publishedSegments); } else { log.info( - "Published segments[%s] with metadata[%s].", - Joiner.on(", ").join( - Iterables.transform( - handedOff.getSegments(), - new Function() - { - @Override - public String apply(DataSegment input) - { - return input.getIdentifier(); - } - } - ) - ), + "Handoff completed for segments[%s] with metadata[%s]", + handedOff.getSegments().stream().map(DataSegment::getIdentifier).collect(Collectors.toList()), Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") ); } @@ -1268,13 +1198,156 @@ public String apply(DataSegment input) return success(); } - private void checkAndMaybeThrowException() + private void checkPublishAndHandoffFailure() throws ExecutionException, InterruptedException + { + // Check if any publishFuture failed. + final List> publishFinished = publishWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture publishFuture : publishFinished) { + // If publishFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + publishFuture.get(); + } + + publishWaitList.removeAll(publishFinished); + + // Check if any handoffFuture failed. + final List> handoffFinished = handOffWaitList + .stream() + .filter(Future::isDone) + .collect(Collectors.toList()); + + for (ListenableFuture handoffFuture : handoffFinished) { + // If handoffFuture failed, the below line will throw an exception and catched by (1), and then (2) or (3). + handoffFuture.get(); + } + + handOffWaitList.removeAll(handoffFinished); + } + + private void publishAndRegisterHandoff(SequenceMetadata sequenceMetadata) + { + log.info("Publishing segments for sequence [%s]", sequenceMetadata); + + final ListenableFuture publishFuture = Futures.transform( + driver.publish( + sequenceMetadata.createPublisher(toolbox, ioConfig.isUseTransaction()), + sequenceMetadata.getCommitterSupplier(topic, lastPersistedOffsets).get(), + Collections.singletonList(sequenceMetadata.getSequenceName()) + ), + (Function) publishedSegmentsAndMetadata -> { + if (publishedSegmentsAndMetadata == null) { + throw new ISE( + "Transaction failure publishing segments for sequence [%s]", + sequenceMetadata + ); + } else { + return publishedSegmentsAndMetadata; + } + } + ); + publishWaitList.add(publishFuture); + + // Create a handoffFuture for every publishFuture. The created handoffFuture must fail if publishFuture fails. + final SettableFuture handoffFuture = SettableFuture.create(); + handOffWaitList.add(handoffFuture); + + Futures.addCallback( + publishFuture, + new FutureCallback() + { + @Override + public void onSuccess(SegmentsAndMetadata publishedSegmentsAndMetadata) + { + log.info( + "Published segments[%s] with metadata[%s].", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()), + Preconditions.checkNotNull(publishedSegmentsAndMetadata.getCommitMetadata(), "commitMetadata") + ); + + sequences.remove(sequenceMetadata); + publishingSequences.remove(sequenceMetadata.getSequenceName()); + try { + persistSequences(); + } + catch (IOException e) { + log.error(e, "Unable to persist state, dying"); + handoffFuture.setException(e); + throw new RuntimeException(e); + } + + Futures.transform( + driver.registerHandoff(publishedSegmentsAndMetadata), + new Function() + { + @Nullable + @Override + public Void apply(@Nullable SegmentsAndMetadata handoffSegmentsAndMetadata) + { + if (handoffSegmentsAndMetadata == null) { + log.warn( + "Failed to handoff segments[%s]", + publishedSegmentsAndMetadata.getSegments() + .stream() + .map(DataSegment::getIdentifier) + .collect(Collectors.toList()) + ); + } + handoffFuture.set(handoffSegmentsAndMetadata); + return null; + } + } + ); + } + + @Override + public void onFailure(Throwable t) + { + log.error(t, "Error while publishing segments for sequence[%s]", sequenceMetadata); + handoffFuture.setException(t); + } + } + ); + } + + private static File getSequencesPersistFile(TaskToolbox toolbox) + { + return new File(toolbox.getPersistDir(), "sequences.json"); + } + + private boolean restoreSequences() throws IOException { - if (throwableAtomicReference.get() != null) { - Throwables.propagate(throwableAtomicReference.get()); + final File sequencesPersistFile = getSequencesPersistFile(toolbox); + if (sequencesPersistFile.exists()) { + sequences = new CopyOnWriteArrayList<>( + toolbox.getObjectMapper().>readValue( + sequencesPersistFile, + new TypeReference>() + { + } + ) + ); + return true; + } else { + return false; } } + private synchronized void persistSequences() throws IOException + { + log.info("Persisting Sequences Metadata [%s]", sequences); + toolbox.getObjectMapper().writerWithType( + new TypeReference>() + { + } + ).writeValue(getSequencesPersistFile(toolbox), sequences); + } + private void maybePersistAndPublishSequences(Supplier committerSupplier) throws InterruptedException { @@ -1289,7 +1362,7 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli result, sequenceMetadata ); - publishQueue.add(sequenceMetadata); + publishAndRegisterHandoff(sequenceMetadata); } catch (InterruptedException e) { log.warn("Interrupted while persisting sequence [%s]", sequenceMetadata); @@ -1299,73 +1372,194 @@ private void maybePersistAndPublishSequences(Supplier committerSuppli } } - private void restoreSequences() throws IOException + private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) { - Preconditions.checkNotNull(sequencesPersistFile); - if (sequencesPersistFile.exists()) { - sequences = new CopyOnWriteArrayList<>(toolbox.getObjectMapper().>readValue( - sequencesPersistFile, new TypeReference>() - { - })); + // Initialize consumer assignment. + final Set assignment = Sets.newHashSet(); + for (Map.Entry entry : nextOffsets.entrySet()) { + final long endOffset = endOffsets.get(entry.getKey()); + if (entry.getValue() < endOffset) { + assignment.add(entry.getKey()); + } else if (entry.getValue() == endOffset) { + log.info("Finished reading partition[%d].", entry.getKey()); + } else { + throw new ISE( + "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", + entry.getValue(), + endOffset + ); + } } - } - private synchronized void persistSequences() throws IOException - { - log.info("Persisting Sequences Metadata [%s]", sequences); - toolbox.getObjectMapper().writerWithType( - new TypeReference>() - { - } - ).writeValue(sequencesPersistFile, sequences); - } + KafkaIndexTask.assignPartitions(consumer, topic, assignment); - @Override - public boolean canRestore() - { - return true; + // Seek to starting offsets. + for (final int partition : assignment) { + final long offset = nextOffsets.get(partition); + log.info("Seeking partition[%d] to offset[%,d].", partition, offset); + consumer.seek(new TopicPartition(topic, partition), offset); + } + + return assignment; } /** - * Authorizes action to be performed on this task's datasource + * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. + *

+ * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. + *

* - * @return authorization result + * @return true if a pause request was handled, false otherwise */ - private Access authorizationCheck(final HttpServletRequest req, Action action) + private boolean possiblyPause() throws InterruptedException { - ResourceAction resourceAction = new ResourceAction( - new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE), - action - ); + pauseLock.lockInterruptibly(); + try { + if (pauseRequested) { + status = Status.PAUSED; + hasPaused.signalAll(); - Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); - if (!access.isAllowed()) { - throw new ForbiddenException(access.toString()); - } + while (pauseRequested) { + log.info("Pausing ingestion until resumed"); + shouldResume.await(); + } - return access; - } + status = Status.READING; + shouldResume.signalAll(); + log.info("Ingestion loop resumed"); + return true; + } + } + finally { + pauseLock.unlock(); + } - @VisibleForTesting - Appenderator getAppenderator() - { - return appenderator; + return false; } - @Override - public void stopGracefully() + private void possiblyResetOffsetsOrWait( + Map outOfRangePartitions, + KafkaConsumer consumer, + TaskToolbox taskToolbox + ) throws InterruptedException, IOException { - log.info("Stopping gracefully (status: [%s])", status); - stopRequested.set(true); - - synchronized (statusLock) { - if (status == Status.PUBLISHING) { - runThread.interrupt(); - return; + final Map resetPartitions = Maps.newHashMap(); + boolean doReset = false; + if (tuningConfig.isResetOffsetAutomatically()) { + for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { + final TopicPartition topicPartition = outOfRangePartition.getKey(); + final long nextOffset = outOfRangePartition.getValue(); + // seek to the beginning to get the least available offset + consumer.seekToBeginning(Collections.singletonList(topicPartition)); + final long leastAvailableOffset = consumer.position(topicPartition); + // reset the seek + consumer.seek(topicPartition, nextOffset); + // Reset consumer offset if resetOffsetAutomatically is set to true + // and the current message offset in the kafka partition is more than the + // next message offset that we are trying to fetch + if (leastAvailableOffset > nextOffset) { + doReset = true; + resetPartitions.put(topicPartition, nextOffset); + } } } - try { + if (doReset) { + sendResetRequestAndWait(resetPartitions, taskToolbox); + } else { + log.warn("Retrying in %dms", pollRetryMs); + pollRetryLock.lockInterruptibly(); + try { + long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs); + while (nanos > 0L && !pauseRequested && !stopRequested.get()) { + nanos = isAwaitingRetry.awaitNanos(nanos); + } + } + finally { + pollRetryLock.unlock(); + } + } + } + + private void requestPause() + { + pauseRequested = true; + } + + private void sendResetRequestAndWait(Map outOfRangePartitions, TaskToolbox taskToolbox) + throws IOException + { + Map partitionOffsetMap = Maps.newHashMap(); + for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { + partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue()); + } + boolean result = taskToolbox.getTaskActionClient() + .submit(new ResetDataSourceMetadataAction( + getDataSource(), + new KafkaDataSourceMetadata(new KafkaPartitions( + ioConfig.getStartPartitions() + .getTopic(), + partitionOffsetMap + )) + )); + + if (result) { + log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource()) + .addData("partitions", partitionOffsetMap.keySet()) + .emit(); + // wait for being killed by supervisor + requestPause(); + } else { + log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); + } + } + + @Override + public boolean canRestore() + { + return true; + } + + /** + * Authorizes action to be performed on this task's datasource + * + * @return authorization result + */ + private Access authorizationCheck(final HttpServletRequest req, Action action) + { + ResourceAction resourceAction = new ResourceAction( + new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE), + action + ); + + Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper); + if (!access.isAllowed()) { + throw new ForbiddenException(access.toString()); + } + + return access; + } + + @VisibleForTesting + Appenderator getAppenderator() + { + return appenderator; + } + + @Override + public void stopGracefully() + { + log.info("Stopping gracefully (status: [%s])", status); + stopRequested.set(true); + + synchronized (statusLock) { + if (status == Status.PUBLISHING) { + runThread.interrupt(); + return; + } + } + + try { if (pauseLock.tryLock(LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { try { if (pauseRequested) { @@ -1474,25 +1668,23 @@ public Map getEndOffsets() @Produces(MediaType.APPLICATION_JSON) public Response setEndOffsetsHTTP( Map offsets, - @QueryParam("resume") @DefaultValue("false") final boolean resume, @QueryParam("finish") @DefaultValue("true") final boolean finish, // this field is only for internal purposes, shouldn't be usually set by users @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets, resume, finish); + return setEndOffsets(offsets, finish); } public Response setEndOffsets( Map offsets, - final boolean resume, final boolean finish // this field is only for internal purposes, shouldn't be usually set by users ) throws InterruptedException { // for backwards compatibility, should be removed from versions greater than 0.12.x if (useLegacy) { - return setEndOffsetsLegacy(offsets, resume); + return setEndOffsetsLegacy(offsets); } if (offsets == null) { @@ -1520,7 +1712,7 @@ public Response setEndOffsets( (latestSequence.getEndOffsets().equals(offsets) && finish)) { log.warn("Ignoring duplicate request, end offsets already set for sequences [%s]", sequences); return Response.ok(offsets).build(); - } else if (latestSequence.isCheckpointed() && !ioConfig.isPauseAfterRead()) { + } else if (latestSequence.isCheckpointed()) { return Response.status(Response.Status.BAD_REQUEST) .entity(StringUtils.format( "WTH?! Sequence [%s] has already endOffsets set, cannot set to [%s]", @@ -1553,13 +1745,12 @@ public Response setEndOffsets( log.info("Updating endOffsets from [%s] to [%s]", endOffsets, offsets); endOffsets.putAll(offsets); } else { - Preconditions.checkState(!ioConfig.isPauseAfterRead()); // create new sequence final SequenceMetadata newSequence = new SequenceMetadata( latestSequence.getSequenceId() + 1, StringUtils.format("%s_%d", ioConfig.getBaseSequenceName(), latestSequence.getSequenceId() + 1), offsets, - maxEndOffsets, + endOffsets, false ); sequences.add(newSequence); @@ -1569,77 +1760,23 @@ public Response setEndOffsets( } catch (Exception e) { log.error(e, "Unable to set end offsets, dying"); - throwableAtomicReference.set(e); - Throwables.propagate(e); + backgroundThreadException = e; + // should resume to immediately finish kafka index task as failed + resume(); + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(Throwables.getStackTraceAsString(e)) + .build(); } finally { pauseLock.unlock(); } } - if (resume) { - resume(); - } + resume(); return Response.ok(offsets).build(); } - private Response setEndOffsetsLegacy( - Map offsets, - final boolean resume - ) throws InterruptedException - { - if (offsets == null) { - return Response.status(Response.Status.BAD_REQUEST) - .entity("Request body must contain a map of { partition:endOffset }") - .build(); - } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - StringUtils.format( - "Request contains partitions not being handled by this task, my partitions: %s", - endOffsets.keySet() - ) - ) - .build(); - } - - pauseLock.lockInterruptibly(); - try { - if (!isPaused()) { - return Response.status(Response.Status.BAD_REQUEST) - .entity("Task must be paused before changing the end offsets") - .build(); - } - - for (Map.Entry entry : offsets.entrySet()) { - if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - StringUtils.format( - "End offset must be >= current offset for partition [%s] (current: %s)", - entry.getKey(), - nextOffsets.get(entry.getKey()) - ) - ) - .build(); - } - } - - endOffsets.putAll(offsets); - log.info("endOffsets changed to %s", endOffsets); - } - finally { - pauseLock.unlock(); - } - - if (resume) { - resume(); - } - - return Response.ok(endOffsets).build(); - } - @GET @Path("/checkpoints") @Produces(MediaType.APPLICATION_JSON) @@ -1649,7 +1786,7 @@ public Map> getCheckpointsHTTP(@Context final HttpSe return getCheckpoints(); } - public Map> getCheckpoints() + private Map> getCheckpoints() { TreeMap> result = new TreeMap<>(); result.putAll( @@ -1661,8 +1798,6 @@ public Map> getCheckpoints() /** * Signals the ingestion loop to pause. * - * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely - * * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets * in the response body if the task successfully paused @@ -1671,15 +1806,14 @@ public Map> getCheckpoints() @Path("/pause") @Produces(MediaType.APPLICATION_JSON) public Response pauseHTTP( - @QueryParam("timeout") @DefaultValue("0") final long timeout, @Context final HttpServletRequest req ) throws InterruptedException { authorizationCheck(req, Action.WRITE); - return pause(timeout); + return pause(); } - public Response pause(final long timeout) throws InterruptedException + public Response pause() throws InterruptedException { if (!(status == Status.PAUSED || status == Status.READING)) { return Response.status(Response.Status.BAD_REQUEST) @@ -1689,7 +1823,6 @@ public Response pause(final long timeout) throws InterruptedException pauseLock.lockInterruptibly(); try { - pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout; pauseRequested = true; pollRetryLock.lockInterruptibly(); @@ -1764,6 +1897,79 @@ public DateTime getStartTime(@Context final HttpServletRequest req) return startTime; } + @Nullable + private static TreeMap> getCheckPointsFromContext( + TaskToolbox toolbox, + KafkaIndexTask task + ) throws IOException + { + final String checkpointsString = task.getContextValue("checkpoints"); + if (checkpointsString != null) { + log.info("Checkpoints [%s]", checkpointsString); + return toolbox.getObjectMapper().readValue( + checkpointsString, + new TypeReference>>() + { + } + ); + } else { + return null; + } + } + + private Response setEndOffsetsLegacy( + Map offsets + ) throws InterruptedException + { + if (offsets == null) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Request body must contain a map of { partition:endOffset }") + .build(); + } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + StringUtils.format( + "Request contains partitions not being handled by this task, my partitions: %s", + endOffsets.keySet() + ) + ) + .build(); + } + + pauseLock.lockInterruptibly(); + try { + if (!isPaused()) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Task must be paused before changing the end offsets") + .build(); + } + + for (Map.Entry entry : offsets.entrySet()) { + if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + StringUtils.format( + "End offset must be >= current offset for partition [%s] (current: %s)", + entry.getKey(), + nextOffsets.get(entry.getKey()) + ) + ) + .build(); + } + } + + endOffsets.putAll(offsets); + log.info("endOffsets changed to %s", endOffsets); + } + finally { + pauseLock.unlock(); + } + + resume(); + + return Response.ok(endOffsets).build(); + } + @VisibleForTesting FireDepartmentMetrics getFireDepartmentMetrics() { @@ -1775,12 +1981,6 @@ private boolean isPaused() return status == Status.PAUSED; } - private void requestPause(long pauseMillis) - { - this.pauseMillis = pauseMillis; - pauseRequested = true; - } - private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { return Appenderators.createRealtime( @@ -1854,169 +2054,6 @@ private static void assignPartitions( ); } - private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) - { - // Initialize consumer assignment. - final Set assignment = Sets.newHashSet(); - for (Map.Entry entry : nextOffsets.entrySet()) { - final long endOffset = endOffsets.get(entry.getKey()); - if (entry.getValue() < endOffset) { - assignment.add(entry.getKey()); - } else if (entry.getValue() == endOffset) { - log.info("Finished reading partition[%d].", entry.getKey()); - } else { - throw new ISE( - "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", - entry.getValue(), - endOffset - ); - } - } - - assignPartitions(consumer, topic, assignment); - - // Seek to starting offsets. - for (final int partition : assignment) { - final long offset = nextOffsets.get(partition); - log.info("Seeking partition[%d] to offset[%,d].", partition, offset); - consumer.seek(new TopicPartition(topic, partition), offset); - } - - return assignment; - } - - /** - * Checks if the pauseRequested flag was set and if so blocks: - * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared - * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared - *

- * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the - * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume - * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal - * shouldResume after adjusting pauseMillis for the new value to take effect. - *

- * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. - *

- * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. - * - * @return true if a pause request was handled, false otherwise - */ - private boolean possiblyPause(Set assignment) throws InterruptedException - { - pauseLock.lockInterruptibly(); - try { - if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { - pauseMillis = PAUSE_FOREVER; - pauseRequested = true; - } - - if (pauseRequested) { - status = Status.PAUSED; - long nanos = 0; - hasPaused.signalAll(); - - while (pauseRequested) { - if (pauseMillis == PAUSE_FOREVER) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } else { - if (pauseMillis > 0) { - log.info("Pausing ingestion for [%,d] ms", pauseMillis); - nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); - pauseMillis = 0; - } - if (nanos <= 0L) { - pauseRequested = false; // timeout elapsed - } - nanos = shouldResume.awaitNanos(nanos); - } - } - - status = Status.READING; - shouldResume.signalAll(); - log.info("Ingestion loop resumed"); - return true; - } - } - finally { - pauseLock.unlock(); - } - - return false; - } - - private void possiblyResetOffsetsOrWait( - Map outOfRangePartitions, - KafkaConsumer consumer, - TaskToolbox taskToolbox - ) throws InterruptedException, IOException - { - final Map resetPartitions = Maps.newHashMap(); - boolean doReset = false; - if (tuningConfig.isResetOffsetAutomatically()) { - for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { - final TopicPartition topicPartition = outOfRangePartition.getKey(); - final long nextOffset = outOfRangePartition.getValue(); - // seek to the beginning to get the least available offset - consumer.seekToBeginning(Collections.singletonList(topicPartition)); - final long leastAvailableOffset = consumer.position(topicPartition); - // reset the seek - consumer.seek(topicPartition, nextOffset); - // Reset consumer offset if resetOffsetAutomatically is set to true - // and the current message offset in the kafka partition is more than the - // next message offset that we are trying to fetch - if (leastAvailableOffset > nextOffset) { - doReset = true; - resetPartitions.put(topicPartition, nextOffset); - } - } - } - - if (doReset) { - sendResetRequestAndWait(resetPartitions, taskToolbox); - } else { - log.warn("Retrying in %dms", pollRetryMs); - pollRetryLock.lockInterruptibly(); - try { - long nanos = TimeUnit.MILLISECONDS.toNanos(pollRetryMs); - while (nanos > 0L && !pauseRequested && !stopRequested.get()) { - nanos = isAwaitingRetry.awaitNanos(nanos); - } - } - finally { - pollRetryLock.unlock(); - } - } - } - - private void sendResetRequestAndWait(Map outOfRangePartitions, TaskToolbox taskToolbox) - throws IOException - { - Map partitionOffsetMap = Maps.newHashMap(); - for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { - partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue()); - } - boolean result = taskToolbox.getTaskActionClient() - .submit(new ResetDataSourceMetadataAction( - getDataSource(), - new KafkaDataSourceMetadata(new KafkaPartitions( - ioConfig.getStartPartitions() - .getTopic(), - partitionOffsetMap - )) - )); - - if (result) { - log.makeAlert("Resetting Kafka offsets for datasource [%s]", getDataSource()) - .addData("partitions", partitionOffsetMap.keySet()) - .emit(); - // wait for being killed by supervisor - requestPause(PAUSE_FOREVER); - } else { - log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); - } - } - private boolean withinMinMaxRecordTime(final InputRow row) { final boolean beforeMinimumMessageTime = ioConfig.getMinimumMessageTime().isPresent() @@ -2142,7 +2179,7 @@ public boolean isSentinel() return sentinel; } - public void setEndOffsets(Map newEndOffsets) + void setEndOffsets(Map newEndOffsets) { lock.lock(); try { @@ -2154,15 +2191,14 @@ public void setEndOffsets(Map newEndOffsets) } } - public void updateAssignments(Map nextPartitionOffset) + void updateAssignments(Map nextPartitionOffset) { lock.lock(); try { assignments.clear(); - nextPartitionOffset.entrySet().forEach(partitionOffset -> { - if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey())) - > 0) { - assignments.add(partitionOffset.getKey()); + nextPartitionOffset.forEach((key, value) -> { + if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) { + assignments.add(key); } }); } @@ -2171,7 +2207,7 @@ public void updateAssignments(Map nextPartitionOffset) } } - public boolean isOpen() + boolean isOpen() { return !assignments.isEmpty(); } @@ -2180,32 +2216,17 @@ boolean canHandle(ConsumerRecord record) { lock.lock(); try { + final Long partitionEndOffset = endOffsets.get(record.partition()); return isOpen() - && endOffsets.get(record.partition()) != null + && partitionEndOffset != null && record.offset() >= startOffsets.get(record.partition()) - && record.offset() < endOffsets.get(record.partition()); + && record.offset() < partitionEndOffset; } finally { lock.unlock(); } } - private SequenceMetadata() - { - this.sequenceId = -1; - this.sequenceName = null; - this.startOffsets = null; - this.endOffsets = null; - this.assignments = null; - this.checkpointed = true; - this.sentinel = true; - } - - public static SequenceMetadata getSentinelSequenceMetadata() - { - return new SequenceMetadata(); - } - @Override public String toString() { @@ -2226,8 +2247,7 @@ public String toString() } } - - public Supplier getCommitterSupplier(String topic, Map lastPersistedOffsets) + Supplier getCommitterSupplier(String topic, Map lastPersistedOffsets) { // Set up committer. return () -> @@ -2280,7 +2300,7 @@ public void run() }; } - public TransactionalSegmentPublisher getPublisher(TaskToolbox toolbox, boolean useTransaction) + TransactionalSegmentPublisher createPublisher(TaskToolbox toolbox, boolean useTransaction) { return (segments, commitMetadata) -> { final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java index 6525d1276318..1c2098055059 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskClient.java @@ -29,22 +29,22 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.FullResponseHandler; -import io.druid.java.util.http.client.response.FullResponseHolder; +import io.druid.indexer.TaskLocation; import io.druid.indexing.common.RetryPolicy; import io.druid.indexing.common.RetryPolicyConfig; import io.druid.indexing.common.RetryPolicyFactory; import io.druid.indexing.common.TaskInfoProvider; -import io.druid.indexer.TaskLocation; import io.druid.indexing.common.TaskStatus; import io.druid.java.util.common.IAE; import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.Execs; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.segment.realtime.firehose.ChatHandlerResource; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; @@ -166,19 +166,14 @@ public boolean resume(final String id) public Map pause(final String id) { - return pause(id, 0); - } - - public Map pause(final String id, final long timeout) - { - log.debug("Pause task[%s] timeout[%d]", id, timeout); + log.debug("Pause task[%s]", id); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "pause", - timeout > 0 ? StringUtils.format("timeout=%d", timeout) : null, + null, true ); @@ -320,18 +315,17 @@ public Map getEndOffsets(final String id) public boolean setEndOffsets( final String id, final Map endOffsets, - final boolean resume, final boolean finalize ) { - log.debug("SetEndOffsets task[%s] endOffsets[%s] resume[%s] finalize[%s]", id, endOffsets, resume, finalize); + log.debug("SetEndOffsets task[%s] endOffsets[%s] finalize[%s]", id, endOffsets, finalize); try { final FullResponseHolder response = submitRequest( id, HttpMethod.POST, "offsets/end", - StringUtils.format("resume=%s&finish=%s", resume, finalize), + StringUtils.format("finish=%s", finalize), jsonMapper.writeValueAsBytes(endOffsets), true ); @@ -374,11 +368,6 @@ public Boolean call() throws Exception } public ListenableFuture> pauseAsync(final String id) - { - return pauseAsync(id, 0); - } - - public ListenableFuture> pauseAsync(final String id, final long timeout) { return executorService.submit( new Callable>() @@ -386,7 +375,7 @@ public ListenableFuture> pauseAsync(final String id, final lo @Override public Map call() throws Exception { - return pause(id, timeout); + return pause(id); } } ); @@ -449,7 +438,7 @@ public Map call() throws Exception } public ListenableFuture setEndOffsetsAsync( - final String id, final Map endOffsets, final boolean resume, final boolean finalize + final String id, final Map endOffsets, final boolean finalize ) { return executorService.submit( @@ -458,7 +447,7 @@ public ListenableFuture setEndOffsetsAsync( @Override public Boolean call() throws Exception { - return setEndOffsets(id, endOffsets, resume, finalize); + return setEndOffsets(id, endOffsets, finalize); } } ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index f79b297d064b..454deffe886d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -1517,7 +1517,7 @@ public Map apply(List> input) log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", groupId, endOffsets); for (final String taskId : setEndOffsetTaskIds) { - setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, true, finalize)); + setEndOffsetFutures.add(taskClient.setEndOffsetsAsync(taskId, endOffsets, finalize)); } List results = Futures.successfulAsList(setEndOffsetFutures) @@ -1764,7 +1764,6 @@ private void createKafkaTasksForGroup(int groupId, int replicas) throws JsonProc new KafkaPartitions(ioConfig.getTopic(), endPartitions), consumerProperties, true, - false, minimumMessageTime, maximumMessageTime, ioConfig.isSkipOffsetGaps() diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java index 49a9b90033d9..22a792f7e6f2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIOConfigTest.java @@ -71,8 +71,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(true, config.isUseTransaction()); - Assert.assertEquals(false, config.isPauseAfterRead()); + Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", config.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -88,7 +87,6 @@ public void testSerdeWithNonDefaults() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" + " \"skipOffsetGaps\": true\n" @@ -109,8 +107,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("mytopic", config.getEndPartitions().getTopic()); Assert.assertEquals(ImmutableMap.of(0, 15L, 1, 200L), config.getEndPartitions().getPartitionOffsetMap()); Assert.assertEquals(ImmutableMap.of("bootstrap.servers", "localhost:9092"), config.getConsumerProperties()); - Assert.assertEquals(false, config.isUseTransaction()); - Assert.assertEquals(true, config.isPauseAfterRead()); + Assert.assertFalse(config.isUseTransaction()); Assert.assertEquals(DateTimes.of("2016-05-31T12:00Z"), config.getMinimumMessageTime().get()); Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); Assert.assertTrue("skipOffsetGaps", config.isSkipOffsetGaps()); @@ -125,7 +122,6 @@ public void testBaseSequenceNameRequired() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -145,7 +141,6 @@ public void testStartPartitionsRequired() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -165,7 +160,6 @@ public void testEndPartitionsRequired() throws Exception + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -185,7 +179,6 @@ public void testConsumerPropertiesRequired() throws Exception + " \"startPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":1, \"1\":10}},\n" + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -206,7 +199,6 @@ public void testStartAndEndTopicMatch() throws Exception + " \"endPartitions\": {\"topic\":\"other\", \"partitionOffsetMap\" : {\"0\":15, \"1\":200}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -227,7 +219,6 @@ public void testStartAndEndPartitionSetMatch() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; @@ -248,7 +239,6 @@ public void testEndOffsetGreaterThanStart() throws Exception + " \"endPartitions\": {\"topic\":\"mytopic\", \"partitionOffsetMap\" : {\"0\":15, \"1\":2}},\n" + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + " \"useTransaction\": false,\n" - + " \"pauseAfterRead\": true,\n" + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\"\n" + "}"; diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java index 2834cc838a89..b63563b8eda0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -27,17 +27,17 @@ import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import io.druid.java.util.http.client.HttpClient; -import io.druid.java.util.http.client.Request; -import io.druid.java.util.http.client.response.FullResponseHandler; -import io.druid.java.util.http.client.response.FullResponseHolder; -import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexer.TaskLocation; +import io.druid.indexing.common.TaskInfoProvider; import io.druid.indexing.common.TaskStatus; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.IAE; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.http.client.HttpClient; +import io.druid.java.util.http.client.Request; +import io.druid.java.util.http.client.response.FullResponseHandler; +import io.druid.java.util.http.client.response.FullResponseHolder; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMockSupport; @@ -56,6 +56,7 @@ import java.io.IOException; import java.net.URL; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -139,13 +140,13 @@ public void testNoTaskLocation() throws Exception Assert.assertEquals(false, client.stop(TEST_ID, true)); Assert.assertEquals(false, client.resume(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); - Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID, 10)); + Assert.assertEquals(ImmutableMap.of(), client.pause(TEST_ID)); Assert.assertEquals(KafkaIndexTask.Status.NOT_STARTED, client.getStatus(TEST_ID)); Assert.assertEquals(null, client.getStartTime(TEST_ID)); Assert.assertEquals(ImmutableMap.of(), client.getCurrentOffsets(TEST_ID, true)); Assert.assertEquals(ImmutableMap.of(), client.getEndOffsets(TEST_ID)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), false, true)); - Assert.assertEquals(false, client.setEndOffsets(TEST_ID, ImmutableMap.of(), true, true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); + Assert.assertEquals(false, client.setEndOffsets(TEST_ID, Collections.emptyMap(), true)); verifyAll(); } @@ -431,33 +432,6 @@ public void testPause() throws Exception Assert.assertEquals(10, (long) results.get(1)); } - @Test - public void testPauseWithTimeout() throws Exception - { - Capture captured = Capture.newInstance(); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2); - expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ); - replayAll(); - - Map results = client.pause(TEST_ID, 101); - verifyAll(); - - Request request = captured.getValue(); - Assert.assertEquals(HttpMethod.POST, request.getMethod()); - Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/pause?timeout=101"), - request.getUrl() - ); - Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); - - Assert.assertEquals(2, results.size()); - Assert.assertEquals(1, (long) results.get(0)); - Assert.assertEquals(10, (long) results.get(1)); - } - @Test public void testPauseWithSubsequentGetOffsets() throws Exception { @@ -544,13 +518,13 @@ public void testSetEndOffsets() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, false, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=false&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -569,13 +543,13 @@ public void testSetEndOffsetsAndResume() throws Exception ); replayAll(); - client.setEndOffsets(TEST_ID, endOffsets, true, true); + client.setEndOffsets(TEST_ID, endOffsets, true); verifyAll(); Request request = captured.getValue(); Assert.assertEquals(HttpMethod.POST, request.getMethod()); Assert.assertEquals( - new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?resume=true&finish=true"), + new URL("http://test-host:1234/druid/worker/v1/chat/test-id/offsets/end?finish=true"), request.getUrl() ); Assert.assertTrue(request.getHeaders().get("X-Druid-Task-Id").contains("test-id")); @@ -723,39 +697,6 @@ public void testPauseAsync() throws Exception } } - @Test - public void testPauseAsyncWithTimeout() throws Exception - { - final int numRequests = TEST_IDS.size(); - Capture captured = Capture.newInstance(CaptureType.ALL); - expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); - expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); - expect(httpClient.go(capture(captured), anyObject(FullResponseHandler.class), eq(TEST_HTTP_TIMEOUT))).andReturn( - Futures.immediateFuture(responseHolder) - ).times(numRequests); - replayAll(); - - List expectedUrls = Lists.newArrayList(); - List>> futures = Lists.newArrayList(); - for (String testId : TEST_IDS) { - expectedUrls.add(new URL(StringUtils.format(URL_FORMATTER, TEST_HOST, TEST_PORT, testId, "pause?timeout=9"))); - futures.add(client.pauseAsync(testId, 9)); - } - - List> responses = Futures.allAsList(futures).get(); - - verifyAll(); - List requests = captured.getValues(); - - Assert.assertEquals(numRequests, requests.size()); - Assert.assertEquals(numRequests, responses.size()); - for (int i = 0; i < numRequests; i++) { - Assert.assertEquals(HttpMethod.POST, requests.get(i).getMethod()); - Assert.assertTrue("unexpectedURL", expectedUrls.contains(requests.get(i).getUrl())); - Assert.assertEquals(Maps.newLinkedHashMap(ImmutableMap.of(0, 1L)), responses.get(i)); - } - } - @Test public void testGetStatusAsync() throws Exception { @@ -909,9 +850,9 @@ public void testSetEndOffsetsAsync() throws Exception TEST_HOST, TEST_PORT, testId, - StringUtils.format("offsets/end?resume=%s&finish=%s", false, true) + StringUtils.format("offsets/end?finish=%s", true) ))); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, false, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); @@ -950,11 +891,11 @@ public void testSetEndOffsetsAsyncWithResume() throws Exception TEST_HOST, TEST_PORT, testId, - "offsets/end?resume=true&finish=true" + "offsets/end?finish=true" ) ) ); - futures.add(client.setEndOffsetsAsync(testId, endOffsets, true, true)); + futures.add(client.setEndOffsetsAsync(testId, endOffsets, true)); } List responses = Futures.allAsList(futures).get(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 45f3003638f0..4232b582217c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -38,10 +38,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.core.NoopEmitter; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.metrics.MonitorScheduler; import io.druid.client.cache.CacheConfig; import io.druid.client.cache.MapCache; import io.druid.data.input.impl.DimensionsSpec; @@ -85,6 +81,10 @@ import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.JSONPathFieldSpec; import io.druid.java.util.common.parsers.JSONPathSpec; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.core.NoopEmitter; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.metrics.MonitorScheduler; import io.druid.math.expr.ExprMacroTable; import io.druid.metadata.DerbyMetadataStorageActionHandlerFactory; import io.druid.metadata.EntryExistsException; @@ -350,7 +350,6 @@ public void testRunAfterDataInserted() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -392,7 +391,6 @@ public void testRunBeforeDataInserted() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -468,7 +466,6 @@ public void testIncrementalHandOff() throws Exception endPartitions, consumerProps, true, - false, null, null, false @@ -481,7 +478,7 @@ public void testIncrementalHandOff() throws Exception final Map currentOffsets = ImmutableMap.copyOf(task.getCurrentOffsets()); Assert.assertTrue(checkpoint1.getPartitionOffsetMap().equals(currentOffsets) || checkpoint2.getPartitionOffsetMap() .equals(currentOffsets)); - task.setEndOffsets(currentOffsets, true, false); + task.setEndOffsets(currentOffsets, false); Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); Assert.assertEquals(1, checkpointRequestsHash.size()); @@ -536,7 +533,6 @@ public void testRunWithMinimumMessageTime() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, DateTimes.of("2010"), null, false @@ -590,7 +586,6 @@ public void testRunWithMaximumMessageTime() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, DateTimes.of("2010"), false @@ -654,7 +649,6 @@ public void testRunWithTransformSpec() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -714,7 +708,6 @@ public void testRunOnNothing() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -755,7 +748,6 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -807,7 +799,6 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -817,10 +808,7 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio final ListenableFuture future = runTask(task); // Wait for task to exit - Assert.assertEquals( - isIncrementalHandoffSupported ? TaskState.SUCCESS : TaskState.FAILED, - future.get().getStatusCode() - ); + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); // Check metrics Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); @@ -861,7 +849,6 @@ public void testReportParseExceptions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -894,7 +881,6 @@ public void testRunReplicas() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -908,7 +894,6 @@ public void testRunReplicas() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -962,7 +947,6 @@ public void testRunConflicting() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -976,7 +960,6 @@ public void testRunConflicting() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1031,7 +1014,6 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1045,7 +1027,6 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 9L)), kafkaServer.consumerProperties(), false, - false, null, null, false @@ -1105,7 +1086,6 @@ public void testRunOneTaskTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1170,7 +1150,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1184,7 +1163,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions(topic, ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1240,7 +1218,6 @@ public void testRestore() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1277,7 +1254,6 @@ public void testRestore() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1329,7 +1305,6 @@ public void testRunWithPauseAndResume() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1354,7 +1329,7 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus()); Map currentOffsets = objectMapper.readValue( - task.pause(0).getEntity().toString(), + task.pause().getEntity().toString(), new TypeReference>() { } @@ -1402,93 +1377,6 @@ public void testRunWithPauseAndResume() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc2)); } - @Test(timeout = 60_000L) - public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception - { - final KafkaIndexTask task = createTask( - null, - new KafkaIOConfig( - "sequence0", - new KafkaPartitions(topic, ImmutableMap.of(0, 1L)), - new KafkaPartitions(topic, ImmutableMap.of(0, 3L)), - kafkaServer.consumerProperties(), - true, - true, - null, - null, - false - ) - ); - - final ListenableFuture future = runTask(task); - - try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { - for (ProducerRecord record : records) { - kafkaProducer.send(record).get(); - } - } - - while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the assigned offsets and paused instead of publishing - Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); - - Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets()); - Map newEndOffsets = ImmutableMap.of(0, 4L); - task.setEndOffsets(newEndOffsets, false, true); - Assert.assertEquals(newEndOffsets, task.getEndOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); - task.resume(); - - while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - // reached the end of the updated offsets and paused - Assert.assertEquals(newEndOffsets, task.getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); - - // try again but with resume flag == true - newEndOffsets = ImmutableMap.of(0, 7L); - task.setEndOffsets(newEndOffsets, true, true); - Assert.assertEquals(newEndOffsets, task.getEndOffsets()); - Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); - - while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { - Thread.sleep(25); - } - - Assert.assertEquals(newEndOffsets, task.getCurrentOffsets()); - Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); - - task.resume(); - - Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); - - // Check metrics - Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); - Assert.assertEquals(1, task.getFireDepartmentMetrics().thrownAway()); - - // Check published metadata - SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); - SegmentDescriptor desc2 = SD(task, "2010/P1D", 0); - SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); - Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); - Assert.assertEquals( - new KafkaDataSourceMetadata(new KafkaPartitions(topic, ImmutableMap.of(0, 7L))), - metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) - ); - - // Check segments in deep storage - Assert.assertEquals(ImmutableList.of("b"), readSegmentColumn("dim1", desc1)); - Assert.assertEquals(ImmutableList.of("c"), readSegmentColumn("dim1", desc2)); - Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentColumn("dim1", desc3)); - } - @Test(timeout = 30_000L) public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception { @@ -1500,7 +1388,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1513,7 +1400,7 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception Thread.sleep(2000); } - task.pause(0); + task.pause(); while (!task.getStatus().equals(KafkaIndexTask.Status.PAUSED)) { Thread.sleep(25); @@ -1539,7 +1426,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva new KafkaPartitions(topic, ImmutableMap.of(0, 500L)), kafkaServer.consumerProperties(), true, - false, null, null, false @@ -1593,7 +1479,6 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception new KafkaPartitions(topic, ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), true, - false, null, null, false diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c429265c697c..60a47f0d58c0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -254,7 +254,6 @@ public void testNoInitialState() throws Exception Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); Assert.assertFalse("skipOffsetGaps", taskConfig.isSkipOffsetGaps()); @@ -1046,7 +1045,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFuture(true)).times(2); @@ -1074,7 +1072,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception KafkaIOConfig taskConfig = kafkaIndexTask.getIOConfig(); Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", taskConfig.isPauseAfterRead()); Assert.assertEquals(topic, taskConfig.getStartPartitions().getTopic()); Assert.assertEquals(10L, (long) taskConfig.getStartPartitions().getPartitionOffsetMap().get(0)); @@ -1164,7 +1161,6 @@ public void testDiscoverExistingPublishingTask() throws Exception Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1255,7 +1251,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Assert.assertEquals("myCustomValue", capturedTaskConfig.getConsumerProperties().get("myCustomKey")); Assert.assertEquals("sequenceName-0", capturedTaskConfig.getBaseSequenceName()); Assert.assertTrue("isUseTransaction", capturedTaskConfig.isUseTransaction()); - Assert.assertFalse("pauseAfterRead", capturedTaskConfig.isPauseAfterRead()); // check that the new task was created with starting offsets matching where the publishing task finished Assert.assertEquals(topic, capturedTaskConfig.getStartPartitions().getTopic()); @@ -1570,7 +1565,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception taskClient.setEndOffsetsAsync( EasyMock.contains("sequenceName-0"), EasyMock.eq(ImmutableMap.of(0, 10L, 1, 20L, 2, 35L)), - EasyMock.eq(true), EasyMock.eq(true) ) ).andReturn(Futures.immediateFailedFuture(new RuntimeException())).times(2); @@ -1695,7 +1689,7 @@ public void testStopGracefully() throws Exception expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); expect(taskClient.pauseAsync("id2")) .andReturn(Futures.immediateFuture((Map) ImmutableMap.of(0, 15L, 1, 25L, 2, 30L))); - expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true, true)) + expect(taskClient.setEndOffsetsAsync("id2", ImmutableMap.of(0, 15L, 1, 25L, 2, 30L), true)) .andReturn(Futures.immediateFuture(true)); taskQueue.shutdown("id3"); expectLastCall().times(2); @@ -2125,7 +2119,6 @@ private KafkaIndexTask createKafkaIndexTask( endPartitions, ImmutableMap.of(), true, - false, minimumMessageTime, maximumMessageTime, false diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 8e18d725e436..aeb05781ec5b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -179,6 +179,7 @@ default int getPriority() */ TaskStatus run(TaskToolbox toolbox) throws Exception; + @Nullable Map getContext(); @Nullable diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java index 72a7f8b325c4..74a1011abc57 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SegmentsAndMetadata.java @@ -23,12 +23,13 @@ import io.druid.timeline.DataSegment; import javax.annotation.Nullable; +import java.util.Collections; import java.util.List; import java.util.Objects; public class SegmentsAndMetadata { - private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(ImmutableList.of(), null); + private static final SegmentsAndMetadata NIL = new SegmentsAndMetadata(Collections.emptyList(), null); private final Object commitMetadata; private final ImmutableList segments; diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java index 6d1109202eaa..33b823df88f2 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java @@ -23,7 +23,6 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; -import com.google.common.base.Throwables; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -221,7 +220,7 @@ public Object persist(final Committer committer) throws InterruptedException throw e; } catch (Exception e) { - throw Throwables.propagate(e); + throw new RuntimeException(e); } }