diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index 314f99cb385f..b0b0c6be7470 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.security.AuthorizerMapper; @@ -129,28 +128,15 @@ static void assignPartitions( @Override protected SeekableStreamIndexTaskRunner createTaskRunner() { - if (context != null && context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED) != null - && ((boolean) context.get(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED))) { - //noinspection unchecked - return new IncrementalPublishingKafkaIndexTaskRunner( - this, - dataSchema.getParser(), - authorizerMapper, - chatHandlerProvider, - savedParseExceptions, - rowIngestionMetersFactory - ); - } else { - //noinspection unchecked - return new LegacyKafkaIndexTaskRunner( - this, - dataSchema.getParser(), - authorizerMapper, - chatHandlerProvider, - savedParseExceptions, - rowIngestionMetersFactory - ); - } + //noinspection unchecked + return new IncrementalPublishingKafkaIndexTaskRunner( + this, + dataSchema.getParser(), + authorizerMapper, + chatHandlerProvider, + savedParseExceptions, + rowIngestionMetersFactory + ); } @Override diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java deleted file mode 100644 index 5b186c44ca0c..000000000000 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/LegacyKafkaIndexTaskRunner.java +++ /dev/null @@ -1,1239 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.kafka; - -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Supplier; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; -import org.apache.druid.data.input.Committer; -import org.apache.druid.data.input.InputRow; -import org.apache.druid.data.input.impl.InputRowParser; -import org.apache.druid.discovery.DiscoveryDruidNode; -import org.apache.druid.discovery.LookupNodeService; -import org.apache.druid.discovery.NodeType; -import org.apache.druid.indexer.IngestionState; -import org.apache.druid.indexer.TaskStatus; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReport; -import org.apache.druid.indexing.common.IngestionStatsAndErrorsTaskReportData; -import org.apache.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder; -import org.apache.druid.indexing.common.TaskReport; -import org.apache.druid.indexing.common.TaskToolbox; -import org.apache.druid.indexing.common.actions.ResetDataSourceMetadataAction; -import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; -import org.apache.druid.indexing.common.stats.RowIngestionMeters; -import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; -import org.apache.druid.indexing.common.task.IndexTaskUtils; -import org.apache.druid.indexing.common.task.RealtimeIndexTask; -import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; -import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; -import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; -import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers; -import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; -import org.apache.druid.indexing.seekablestream.SequenceMetadata; -import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; -import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; -import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.common.StreamPartition; -import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.collect.Utils; -import org.apache.druid.java.util.common.parsers.ParseException; -import org.apache.druid.java.util.emitter.EmittingLogger; -import org.apache.druid.segment.indexing.RealtimeIOConfig; -import org.apache.druid.segment.realtime.FireDepartment; -import org.apache.druid.segment.realtime.FireDepartmentMetrics; -import org.apache.druid.segment.realtime.appenderator.Appenderator; -import org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; -import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; -import org.apache.druid.segment.realtime.appenderator.SegmentsAndMetadata; -import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver; -import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; -import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; -import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Action; -import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.utils.CircularBuffer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; -import org.apache.kafka.common.TopicPartition; -import org.joda.time.DateTime; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.Consumes; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -/** - * Kafka index task runner which doesn't support incremental segment publishing. We keep this to support rolling update. - * This class will be removed in a future release. - */ -@Deprecated -public class LegacyKafkaIndexTaskRunner extends SeekableStreamIndexTaskRunner -{ - private static final EmittingLogger log = new EmittingLogger(LegacyKafkaIndexTaskRunner.class); - private static final String METADATA_NEXT_PARTITIONS = "nextPartitions"; - - private final ConcurrentMap endOffsets = new ConcurrentHashMap<>(); - private final ConcurrentMap nextOffsets = new ConcurrentHashMap<>(); - - // The pause lock and associated conditions are to support coordination between the Jetty threads and the main - // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully - // the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The - // fields are used as follows (every step requires acquiring [pauseLock]): - // Pausing: - // - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the - // condition checked when [hasPaused] is signalled. - // - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED, - // [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by - // the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled. - // Resuming: - // - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to - // change to something other than PAUSED, with the condition checked when [shouldResume] is signalled. - // - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends, - // [status] is changed to STARTING and [shouldResume] is signalled. - - private final Lock pauseLock = new ReentrantLock(); - private final Condition hasPaused = pauseLock.newCondition(); - private final Condition shouldResume = pauseLock.newCondition(); - - private final AtomicBoolean stopRequested = new AtomicBoolean(false); - private final AtomicBoolean publishOnStop = new AtomicBoolean(false); - - // [statusLock] is used to synchronize the Jetty thread calling stopGracefully() with the main run thread. It prevents - // the main run thread from switching into a publishing state while the stopGracefully() thread thinks it's still in - // a pre-publishing state. This is important because stopGracefully() will try to use the [stopRequested] flag to stop - // the main thread where possible, but this flag is not honored once publishing has begun so in this case we must - // interrupt the thread. The lock ensures that if the run thread is about to transition into publishing state, it - // blocks until after stopGracefully() has set [stopRequested] and then does a final check on [stopRequested] before - // transitioning to publishing state. - private final Object statusLock = new Object(); - - private final Lock pollRetryLock = new ReentrantLock(); - private final Condition isAwaitingRetry = pollRetryLock.newCondition(); - - private final KafkaIndexTask task; - private final KafkaIndexTaskIOConfig ioConfig; - private final KafkaIndexTaskTuningConfig tuningConfig; - private final InputRowParser parser; - private final AuthorizerMapper authorizerMapper; - private final Optional chatHandlerProvider; - private final CircularBuffer savedParseExceptions; - private final RowIngestionMeters rowIngestionMeters; - - private volatile DateTime startTime; - private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) - private volatile ObjectMapper objectMapper; - private volatile Thread runThread; - private volatile Appenderator appenderator; - private volatile StreamAppenderatorDriver driver; - private volatile FireDepartmentMetrics fireDepartmentMetrics; - private volatile IngestionState ingestionState; - - private volatile boolean pauseRequested; - - LegacyKafkaIndexTaskRunner( - KafkaIndexTask task, - InputRowParser parser, - AuthorizerMapper authorizerMapper, - Optional chatHandlerProvider, - CircularBuffer savedParseExceptions, - RowIngestionMetersFactory rowIngestionMetersFactory - ) - { - super( - task, - parser, - authorizerMapper, - chatHandlerProvider, - savedParseExceptions, - rowIngestionMetersFactory - ); - this.task = task; - this.ioConfig = task.getIOConfig(); - this.tuningConfig = task.getTuningConfig(); - this.parser = parser; - this.authorizerMapper = authorizerMapper; - this.chatHandlerProvider = chatHandlerProvider; - this.savedParseExceptions = savedParseExceptions; - this.rowIngestionMeters = rowIngestionMetersFactory.createRowIngestionMeters(); - - this.endOffsets.putAll(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap()); - this.ingestionState = IngestionState.NOT_STARTED; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) - { - try { - return runInternal(toolbox); - } - catch (Exception e) { - log.error(e, "Encountered exception while running task."); - final String errorMsg = Throwables.getStackTraceAsString(e); - toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(errorMsg)); - return TaskStatus.failure( - task.getId(), - errorMsg - ); - } - } - - @Override - public Appenderator getAppenderator() - { - return appenderator; - } - - @Override - public RowIngestionMeters getRowIngestionMeters() - { - return rowIngestionMeters; - } - - private TaskStatus runInternal(TaskToolbox toolbox) throws Exception - { - log.info("Starting up!"); - startTime = DateTimes.nowUtc(); - status = Status.STARTING; - objectMapper = toolbox.getObjectMapper(); - - if (chatHandlerProvider.isPresent()) { - log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); - chatHandlerProvider.get().register(task.getId(), this, false); - } else { - log.warn("No chat handler detected"); - } - - runThread = Thread.currentThread(); - - // Set up FireDepartmentMetrics - final FireDepartment fireDepartmentForMetrics = new FireDepartment( - task.getDataSchema(), - new RealtimeIOConfig(null, null, null), - null - ); - fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics(); - toolbox.getMonitorScheduler() - .addMonitor(TaskRealtimeMetricsMonitorBuilder.build(task, fireDepartmentForMetrics, rowIngestionMeters)); - - final String lookupTier = task.getContextValue(RealtimeIndexTask.CTX_KEY_LOOKUP_TIER); - LookupNodeService lookupNodeService = lookupTier == null ? - toolbox.getLookupNodeService() : - new LookupNodeService(lookupTier); - DiscoveryDruidNode discoveryDruidNode = new DiscoveryDruidNode( - toolbox.getDruidNode(), - NodeType.PEON, - ImmutableMap.of( - toolbox.getDataNodeService().getName(), toolbox.getDataNodeService(), - lookupNodeService.getName(), lookupNodeService - ) - ); - - ingestionState = IngestionState.BUILD_SEGMENTS; - - try ( - final Appenderator appenderator0 = task.newAppenderator(fireDepartmentMetrics, toolbox); - final StreamAppenderatorDriver driver = task.newDriver(appenderator0, toolbox, fireDepartmentMetrics); - final KafkaConsumer consumer = task.newConsumer() - ) { - toolbox.getDataSegmentServerAnnouncer().announce(); - toolbox.getDruidNodeAnnouncer().announce(discoveryDruidNode); - - appenderator = appenderator0; - - final String topic = ioConfig.getStartSequenceNumbers().getStream(); - - // Start up, set up initial offsets. - final Object restoredMetadata = driver.startJob(); - if (restoredMetadata == null) { - nextOffsets.putAll(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); - } else { - final Map restoredMetadataMap = (Map) restoredMetadata; - final SeekableStreamEndSequenceNumbers restoredNextPartitions = toolbox.getObjectMapper().convertValue( - restoredMetadataMap.get(METADATA_NEXT_PARTITIONS), - toolbox.getObjectMapper().getTypeFactory().constructParametrizedType( - SeekableStreamStartSequenceNumbers.class, - SeekableStreamStartSequenceNumbers.class, - Integer.class, - Long.class - ) - ); - nextOffsets.putAll(restoredNextPartitions.getPartitionSequenceNumberMap()); - - // Sanity checks. - if (!restoredNextPartitions.getStream().equals(ioConfig.getStartSequenceNumbers().getStream())) { - throw new ISE( - "WTF?! Restored topic[%s] but expected topic[%s]", - restoredNextPartitions.getStream(), - ioConfig.getStartSequenceNumbers().getStream() - ); - } - - if (!nextOffsets.keySet().equals(ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet())) { - throw new ISE( - "WTF?! Restored partitions[%s] but expected partitions[%s]", - nextOffsets.keySet(), - ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet() - ); - } - } - - // Set up sequenceNames. - final Map sequenceNames = new HashMap<>(); - for (Integer partitionNum : nextOffsets.keySet()) { - sequenceNames.put(partitionNum, StringUtils.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum)); - } - - // Set up committer. - final Supplier committerSupplier = new Supplier() - { - @Override - public Committer get() - { - final Map snapshot = ImmutableMap.copyOf(nextOffsets); - - return new Committer() - { - @Override - public Object getMetadata() - { - return ImmutableMap.of( - METADATA_NEXT_PARTITIONS, - new SeekableStreamEndSequenceNumbers<>( - ioConfig.getStartSequenceNumbers().getStream(), - snapshot - ) - ); - } - - @Override - public void run() - { - // Do nothing. - } - }; - } - }; - - Set assignment = assignPartitionsAndSeekToNext(consumer, topic); - - // Main loop. - // Could eventually support leader/follower mode (for keeping replicas more in sync) - boolean stillReading = !assignment.isEmpty(); - status = Status.READING; - try { - while (stillReading) { - if (possiblyPause()) { - // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign - // partitions upon resuming. This is safe even if the end offsets have not been modified. - assignment = assignPartitionsAndSeekToNext(consumer, topic); - - if (assignment.isEmpty()) { - log.info("All partitions have been fully read"); - publishOnStop.set(true); - stopRequested.set(true); - } - } - - if (stopRequested.get()) { - break; - } - - // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to - // offset is not present in the topic-partition. This can happen if we're asking a task to read from data - // that has not been written yet (which is totally legitimate). So let's wait for it to show up. - ConsumerRecords records = ConsumerRecords.empty(); - try { - records = consumer.poll(task.getIOConfig().getPollTimeout()); - } - catch (OffsetOutOfRangeException e) { - log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); - possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); - stillReading = !assignment.isEmpty(); - } - - for (ConsumerRecord record : records) { - if (log.isTraceEnabled()) { - log.trace( - "Got topic[%s] partition[%d] offset[%,d].", - record.topic(), - record.partition(), - record.offset() - ); - } - - if (record.offset() < endOffsets.get(record.partition())) { - - try { - final byte[] valueBytes = record.value(); - final List rows = valueBytes == null - ? Utils.nullableListOf((InputRow) null) - : parser.parseBatch(ByteBuffer.wrap(valueBytes)); - boolean isPersistRequired = false; - final Map> segmentsToMoveOut = new HashMap<>(); - - for (InputRow row : rows) { - if (row != null && task.withinMinMaxRecordTime(row)) { - final String sequenceName = sequenceNames.get(record.partition()); - final AppenderatorDriverAddResult addResult = driver.add( - row, - sequenceName, - committerSupplier, - false, - false - ); - - if (addResult.isOk()) { - // If the number of rows in the segment exceeds the threshold after adding a row, - // move the segment out from the active segments of BaseAppenderatorDriver to make a new segment. - if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) { - segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>()) - .add(addResult.getSegmentIdentifier()); - } - isPersistRequired |= addResult.isPersistRequired(); - } else { - // Failure to allocate segment puts determinism at risk, bail out to be safe. - // May want configurable behavior here at some point. - // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. - throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); - } - - if (addResult.getParseException() != null) { - handleParseException(addResult.getParseException(), record); - } else { - rowIngestionMeters.incrementProcessed(); - } - } else { - rowIngestionMeters.incrementThrownAway(); - } - } - - if (isPersistRequired) { - driver.persist(committerSupplier.get()); - } - segmentsToMoveOut.forEach((String sequence, Set segments) -> { - driver.moveSegmentOut(sequence, new ArrayList<>(segments)); - }); - } - catch (ParseException e) { - handleParseException(e, record); - } - - nextOffsets.put(record.partition(), record.offset() + 1); - } - - if (nextOffsets.get(record.partition()) >= (endOffsets.get(record.partition())) - && assignment.remove(record.partition())) { - log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); - KafkaIndexTask.assignPartitions(consumer, topic, assignment); - stillReading = !assignment.isEmpty(); - } - } - } - ingestionState = IngestionState.COMPLETED; - } - catch (Exception e) { - log.error(e, "Encountered exception in runLegacy() before persisting."); - throw e; - } - finally { - driver.persist(committerSupplier.get()); // persist pending data - } - - synchronized (statusLock) { - if (stopRequested.get() && !publishOnStop.get()) { - throw new InterruptedException("Stopping without publishing"); - } - - status = Status.PUBLISHING; - } - - final TransactionalSegmentPublisher publisher = (segments, commitMetadata) -> { - final SeekableStreamEndSequenceNumbers finalPartitions = toolbox.getObjectMapper().convertValue( - ((Map) Preconditions.checkNotNull(commitMetadata, "commitMetadata")).get(METADATA_NEXT_PARTITIONS), - toolbox.getObjectMapper() - .getTypeFactory() - .constructParametrizedType( - SeekableStreamEndSequenceNumbers.class, - SeekableStreamEndSequenceNumbers.class, - Integer.class, - Long.class - ) - ); - - // Sanity check, we should only be publishing things that match our desired end state. - if (!endOffsets.equals(finalPartitions.getPartitionSequenceNumberMap())) { - throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); - } - - final SegmentTransactionalInsertAction action; - - if (ioConfig.isUseTransaction()) { - action = new SegmentTransactionalInsertAction( - segments, - new KafkaDataSourceMetadata(ioConfig.getStartSequenceNumbers()), - new KafkaDataSourceMetadata(finalPartitions) - ); - } else { - action = new SegmentTransactionalInsertAction(segments, null, null); - } - - log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction()); - - return toolbox.getTaskActionClient().submit(action); - }; - - // Supervised kafka tasks are killed by KafkaSupervisor if they are stuck during publishing segments or waiting - // for hand off. See KafkaSupervisorIOConfig.completionTimeout. - final SegmentsAndMetadata published = driver.publish( - publisher, - committerSupplier.get(), - sequenceNames.values() - ).get(); - - List publishedSegmentIds = Lists.transform(published.getSegments(), DataSegment::getId); - log.info( - "Published segments %s with metadata[%s].", - publishedSegmentIds, - Preconditions.checkNotNull(published.getCommitMetadata(), "commitMetadata") - ); - - final Future handoffFuture = driver.registerHandoff(published); - SegmentsAndMetadata handedOff = null; - if (tuningConfig.getHandoffConditionTimeout() == 0) { - handedOff = handoffFuture.get(); - } else { - try { - handedOff = handoffFuture.get(tuningConfig.getHandoffConditionTimeout(), TimeUnit.MILLISECONDS); - } - catch (TimeoutException e) { - log.makeAlert("Timed out after [%d] millis waiting for handoffs", tuningConfig.getHandoffConditionTimeout()) - .addData("TaskId", task.getId()) - .emit(); - } - } - - if (handedOff == null) { - log.warn("Failed to handoff segments %s", publishedSegmentIds); - } else { - log.info( - "Handoff completed for segments %s with metadata[%s]", - Lists.transform(handedOff.getSegments(), DataSegment::getId), - Preconditions.checkNotNull(handedOff.getCommitMetadata(), "commitMetadata") - ); - } - } - catch (InterruptedException | RejectedExecutionException e) { - // handle the InterruptedException that gets wrapped in a RejectedExecutionException - if (e instanceof RejectedExecutionException - && (e.getCause() == null || !(e.getCause() instanceof InterruptedException))) { - throw e; - } - - // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow - if (!stopRequested.get()) { - Thread.currentThread().interrupt(); - throw e; - } - - log.info("The task was asked to stop before completing"); - } - finally { - if (chatHandlerProvider.isPresent()) { - chatHandlerProvider.get().unregister(task.getId()); - } - - toolbox.getDruidNodeAnnouncer().unannounce(discoveryDruidNode); - toolbox.getDataSegmentServerAnnouncer().unannounce(); - } - - toolbox.getTaskReportFileWriter().write(getTaskCompletionReports(null)); - return TaskStatus.success( - task.getId(), - null - ); - } - - @Override - protected boolean isEndOfShard(Long seqNum) - { - return false; - } - - @Override - public TypeReference>> getSequenceMetadataTypeReference() - { - return new TypeReference>>() - { - }; - } - - @Nonnull - @Override - protected List> getRecords( - RecordSupplier recordSupplier, TaskToolbox toolbox - ) - { - throw new UnsupportedOperationException(); - } - - private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) - { - // Initialize consumer assignment. - final Set assignment = new HashSet<>(); - for (Map.Entry entry : nextOffsets.entrySet()) { - final long endOffset = endOffsets.get(entry.getKey()); - if (entry.getValue() < endOffset) { - assignment.add(entry.getKey()); - } else if (entry.getValue() == endOffset) { - log.info("Finished reading partition[%d].", entry.getKey()); - } else { - throw new ISE( - "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", - entry.getValue(), - endOffset - ); - } - } - - KafkaIndexTask.assignPartitions(consumer, topic, assignment); - - // Seek to starting offsets. - for (final int partition : assignment) { - final long offset = nextOffsets.get(partition); - log.info("Seeking partition[%d] to offset[%,d].", partition, offset); - consumer.seek(new TopicPartition(topic, partition), offset); - } - - return assignment; - } - - /** - * Checks if the pauseRequested flag was set and if so blocks until pauseRequested is cleared. - *

- * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. - *

- * - * @return true if a pause request was handled, false otherwise - */ - private boolean possiblyPause() throws InterruptedException - { - pauseLock.lockInterruptibly(); - try { - if (pauseRequested) { - status = Status.PAUSED; - hasPaused.signalAll(); - - while (pauseRequested) { - log.info("Pausing ingestion until resumed"); - shouldResume.await(); - } - - status = Status.READING; - shouldResume.signalAll(); - log.info("Ingestion loop resumed"); - return true; - } - } - finally { - pauseLock.unlock(); - } - - return false; - } - - @Override - protected void possiblyResetDataSourceMetadata( - TaskToolbox toolbox, - RecordSupplier recordSupplier, - Set> assignment - ) - { - throw new UnsupportedOperationException(); - } - - @Override - protected boolean isEndOffsetExclusive() - { - return true; - } - - @Override - protected SeekableStreamEndSequenceNumbers deserializePartitionsFromMetadata( - ObjectMapper mapper, - Object object - ) - { - throw new UnsupportedOperationException(); - } - - private void possiblyResetOffsetsOrWait( - Map outOfRangePartitions, - KafkaConsumer consumer, - TaskToolbox taskToolbox - ) throws InterruptedException, IOException - { - final Map resetPartitions = new HashMap<>(); - boolean doReset = false; - if (tuningConfig.isResetOffsetAutomatically()) { - for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { - final TopicPartition topicPartition = outOfRangePartition.getKey(); - final long nextOffset = outOfRangePartition.getValue(); - // seek to the beginning to get the least available offset - consumer.seekToBeginning(Collections.singletonList(topicPartition)); - final long leastAvailableOffset = consumer.position(topicPartition); - // reset the seek - consumer.seek(topicPartition, nextOffset); - // Reset consumer offset if resetOffsetAutomatically is set to true - // and the current message offset in the kafka partition is more than the - // next message offset that we are trying to fetch - if (leastAvailableOffset > nextOffset) { - doReset = true; - resetPartitions.put(topicPartition, nextOffset); - } - } - } - - if (doReset) { - sendResetRequestAndWaitLegacy(resetPartitions, taskToolbox); - } else { - log.warn("Retrying in %dms", task.getPollRetryMs()); - pollRetryLock.lockInterruptibly(); - try { - long nanos = TimeUnit.MILLISECONDS.toNanos(task.getPollRetryMs()); - while (nanos > 0L && !pauseRequested && !stopRequested.get()) { - nanos = isAwaitingRetry.awaitNanos(nanos); - } - } - finally { - pollRetryLock.unlock(); - } - } - } - - private void sendResetRequestAndWaitLegacy(Map outOfRangePartitions, TaskToolbox taskToolbox) - throws IOException - { - Map partitionOffsetMap = new HashMap<>(); - for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { - partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue()); - } - boolean result = taskToolbox.getTaskActionClient() - .submit(new ResetDataSourceMetadataAction( - task.getDataSource(), - new KafkaDataSourceMetadata( - new SeekableStreamStartSequenceNumbers<>( - ioConfig.getStartSequenceNumbers().getStream(), - partitionOffsetMap, - Collections.emptySet() - ) - ) - )); - - if (result) { - log.makeAlert("Resetting Kafka offsets for datasource [%s]", task.getDataSource()) - .addData("partitions", partitionOffsetMap.keySet()) - .emit(); - // wait for being killed by supervisor - requestPause(); - } else { - log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); - } - } - - private void requestPause() - { - pauseRequested = true; - } - - @Override - protected Long getNextStartOffset(Long sequenceNumber) - { - throw new UnsupportedOperationException(); - } - - private void handleParseException(ParseException pe, ConsumerRecord record) - { - if (pe.isFromPartiallyValidRow()) { - rowIngestionMeters.incrementProcessedWithError(); - } else { - rowIngestionMeters.incrementUnparseable(); - } - - if (tuningConfig.isLogParseExceptions()) { - log.error( - pe, - "Encountered parse exception on row from partition[%d] offset[%d]", - record.partition(), - record.offset() - ); - } - - if (savedParseExceptions != null) { - savedParseExceptions.add(pe); - } - - if (rowIngestionMeters.getUnparseable() + rowIngestionMeters.getProcessedWithError() - > tuningConfig.getMaxParseExceptions()) { - log.error("Max parse exceptions exceeded, terminating task..."); - throw new RuntimeException("Max parse exceptions exceeded, terminating task..."); - } - } - - private Map getTaskCompletionReports(@Nullable String errorMsg) - { - return TaskReport.buildTaskReports( - new IngestionStatsAndErrorsTaskReport( - task.getId(), - new IngestionStatsAndErrorsTaskReportData( - ingestionState, - getTaskCompletionUnparseableEvents(), - getTaskCompletionRowStats(), - errorMsg - ) - ) - ); - } - - private Map getTaskCompletionUnparseableEvents() - { - Map unparseableEventsMap = new HashMap<>(); - List buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions( - savedParseExceptions - ); - if (buildSegmentsParseExceptionMessages != null) { - unparseableEventsMap.put(RowIngestionMeters.BUILD_SEGMENTS, buildSegmentsParseExceptionMessages); - } - return unparseableEventsMap; - } - - private Map getTaskCompletionRowStats() - { - Map metrics = new HashMap<>(); - metrics.put( - RowIngestionMeters.BUILD_SEGMENTS, - rowIngestionMeters.getTotals() - ); - return metrics; - } - - @Override - public void stopGracefully() - { - log.info("Stopping gracefully (status: [%s])", status); - stopRequested.set(true); - - synchronized (statusLock) { - if (status == Status.PUBLISHING) { - runThread.interrupt(); - return; - } - } - - try { - if (pauseLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - if (pauseRequested) { - pauseRequested = false; - shouldResume.signalAll(); - } - } - finally { - pauseLock.unlock(); - } - } else { - log.warn("While stopping: failed to acquire pauseLock before timeout, interrupting run thread"); - runThread.interrupt(); - return; - } - - if (pollRetryLock.tryLock(SeekableStreamIndexTask.LOCK_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { - try { - isAwaitingRetry.signalAll(); - } - finally { - pollRetryLock.unlock(); - } - } else { - log.warn("While stopping: failed to acquire pollRetryLock before timeout, interrupting run thread"); - runThread.interrupt(); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - /** - * Authorizes action to be performed on this task's datasource - * - * @return authorization result - */ - private Access authorizationCheck(final HttpServletRequest req, Action action) - { - return IndexTaskUtils.datasourceAuthorizationCheck(req, action, task.getDataSource(), authorizerMapper); - } - - @Override - @POST - @Path("/stop") - public Response stop(@Context final HttpServletRequest req) - { - authorizationCheck(req, Action.WRITE); - stopGracefully(); - return Response.status(Response.Status.OK).build(); - } - - @Override - @GET - @Path("/status") - @Produces(MediaType.APPLICATION_JSON) - public Status getStatusHTTP(@Context final HttpServletRequest req) - { - authorizationCheck(req, Action.READ); - return status; - } - - @Override - public Status getStatus() - { - return status; - } - - @Override - @GET - @Path("/offsets/current") - @Produces(MediaType.APPLICATION_JSON) - public Map getCurrentOffsets(@Context final HttpServletRequest req) - { - authorizationCheck(req, Action.READ); - return getCurrentOffsets(); - } - - @Override - public ConcurrentMap getCurrentOffsets() - { - return nextOffsets; - } - - @Override - @GET - @Path("/offsets/end") - @Produces(MediaType.APPLICATION_JSON) - public Map getEndOffsetsHTTP(@Context final HttpServletRequest req) - { - authorizationCheck(req, Action.READ); - return getEndOffsets(); - } - - @Override - public Map getEndOffsets() - { - return endOffsets; - } - - @Override - public Response setEndOffsets(Map sequenceNumbers, boolean finish) throws InterruptedException - { - // finish is not used in this mode - return setEndOffsets(sequenceNumbers); - } - - @POST - @Path("/offsets/end") - @Consumes(MediaType.APPLICATION_JSON) - @Produces(MediaType.APPLICATION_JSON) - public Response setEndOffsetsHTTP( - Map offsets, - @Context final HttpServletRequest req - ) throws InterruptedException - { - authorizationCheck(req, Action.WRITE); - return setEndOffsets(offsets); - } - - @Override - @GET - @Path("/rowStats") - @Produces(MediaType.APPLICATION_JSON) - public Response getRowStats( - @Context final HttpServletRequest req - ) - { - authorizationCheck(req, Action.READ); - Map returnMap = new HashMap<>(); - Map totalsMap = new HashMap<>(); - Map averagesMap = new HashMap<>(); - - totalsMap.put( - RowIngestionMeters.BUILD_SEGMENTS, - rowIngestionMeters.getTotals() - ); - averagesMap.put( - RowIngestionMeters.BUILD_SEGMENTS, - rowIngestionMeters.getMovingAverages() - ); - - returnMap.put("movingAverages", averagesMap); - returnMap.put("totals", totalsMap); - return Response.ok(returnMap).build(); - } - - @Override - @GET - @Path("/unparseableEvents") - @Produces(MediaType.APPLICATION_JSON) - public Response getUnparseableEvents( - @Context final HttpServletRequest req - ) - { - authorizationCheck(req, Action.READ); - List events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions); - return Response.ok(events).build(); - } - - public Response setEndOffsets( - Map offsets - ) throws InterruptedException - { - if (offsets == null) { - return Response.status(Response.Status.BAD_REQUEST) - .entity("Request body must contain a map of { partition:endOffset }") - .build(); - } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - StringUtils.format( - "Request contains partitions not being handled by this task, my partitions: %s", - endOffsets.keySet() - ) - ) - .build(); - } - - pauseLock.lockInterruptibly(); - try { - if (!isPaused()) { - return Response.status(Response.Status.BAD_REQUEST) - .entity("Task must be paused before changing the end offsets") - .build(); - } - - for (Map.Entry entry : offsets.entrySet()) { - if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { - return Response.status(Response.Status.BAD_REQUEST) - .entity( - StringUtils.format( - "End offset must be >= current offset for partition [%s] (current: %s)", - entry.getKey(), - nextOffsets.get(entry.getKey()) - ) - ) - .build(); - } - } - - endOffsets.putAll(offsets); - log.info("endOffsets changed to %s", endOffsets); - } - finally { - pauseLock.unlock(); - } - - resume(); - - return Response.ok(endOffsets).build(); - } - - private boolean isPaused() - { - return status == Status.PAUSED; - } - - /** - * Signals the ingestion loop to pause. - * - * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the - * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets - * in the response body if the task successfully paused - */ - @Override - @POST - @Path("/pause") - @Produces(MediaType.APPLICATION_JSON) - public Response pauseHTTP( - @Context final HttpServletRequest req - ) throws InterruptedException - { - authorizationCheck(req, Action.WRITE); - return pause(); - } - - @Override - public Response pause() throws InterruptedException - { - if (!(status == Status.PAUSED || status == Status.READING)) { - return Response.status(Response.Status.BAD_REQUEST) - .entity(StringUtils.format("Can't pause, task is not in a pausable state (state: [%s])", status)) - .build(); - } - - pauseLock.lockInterruptibly(); - try { - pauseRequested = true; - - pollRetryLock.lockInterruptibly(); - try { - isAwaitingRetry.signalAll(); - } - finally { - pollRetryLock.unlock(); - } - - if (isPaused()) { - shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis - } - - long nanos = TimeUnit.SECONDS.toNanos(2); - while (!isPaused()) { - if (nanos <= 0L) { - return Response.status(Response.Status.ACCEPTED) - .entity("Request accepted but task has not yet paused") - .build(); - } - nanos = hasPaused.awaitNanos(nanos); - } - } - finally { - pauseLock.unlock(); - } - - try { - return Response.ok().entity(objectMapper.writeValueAsString(getCurrentOffsets())).build(); - } - catch (JsonProcessingException e) { - throw new RuntimeException(e); - } - } - - @Override - @POST - @Path("/resume") - public Response resumeHTTP(@Context final HttpServletRequest req) throws InterruptedException - { - authorizationCheck(req, Action.WRITE); - resume(); - return Response.status(Response.Status.OK).build(); - } - - @Override - public void resume() throws InterruptedException - { - pauseLock.lockInterruptibly(); - try { - pauseRequested = false; - shouldResume.signalAll(); - - long nanos = TimeUnit.SECONDS.toNanos(5); - while (isPaused()) { - if (nanos <= 0L) { - throw new RuntimeException("Resume command was not accepted within 5 seconds"); - } - nanos = shouldResume.awaitNanos(nanos); - } - } - finally { - pauseLock.unlock(); - } - } - - @Override - protected SeekableStreamDataSourceMetadata createDataSourceMetadata( - SeekableStreamSequenceNumbers partitions - ) - { - throw new UnsupportedOperationException(); - } - - @Override - protected OrderedSequenceNumber createSequenceNumber(Long sequenceNumber) - { - throw new UnsupportedOperationException(); - } - - @Override - @GET - @Path("/time/start") - @Produces(MediaType.APPLICATION_JSON) - public DateTime getStartTime(@Context final HttpServletRequest req) - { - authorizationCheck(req, Action.WRITE); - return startTime; - } - - @Nullable - @Override - protected TreeMap> getCheckPointsFromContext( - TaskToolbox toolbox, - String checkpointsString - ) - { - throw new UnsupportedOperationException(); - } - -} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 4c9b39b9d6e3..783af47ca8bb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -140,8 +140,6 @@ import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig; -import org.apache.druid.segment.loading.SegmentLoaderConfig; -import org.apache.druid.segment.loading.StorageLocationConfig; import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier; import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; @@ -165,8 +163,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.File; @@ -191,7 +187,6 @@ import static org.apache.druid.query.QueryPlus.wrap; -@RunWith(Parameterized.class) public class KafkaIndexTaskTest { private static final Logger log = new Logger(KafkaIndexTaskTest.class); @@ -228,24 +223,10 @@ public class KafkaIndexTaskTest private File directory; private String topic; private List> records; - private final boolean isIncrementalHandoffSupported; private final Set checkpointRequestsHash = new HashSet<>(); private File reportsFile; private RowIngestionMetersFactory rowIngestionMetersFactory; - // This should be removed in versions greater that 0.12.x - // isIncrementalHandoffSupported should always be set to true in those later versions - @Parameterized.Parameters(name = "isIncrementalHandoffSupported = {0}") - public static Iterable constructorFeeder() - { - return ImmutableList.of(new Object[]{true}, new Object[]{false}); - } - - public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported) - { - this.isIncrementalHandoffSupported = isIncrementalHandoffSupported; - } - private static final DataSchema DATA_SCHEMA = new DataSchema( "test_ds", OBJECT_MAPPER.convertValue( @@ -502,9 +483,6 @@ public void testRunBeforeDataInserted() throws Exception @Test(timeout = 60_000L) public void testIncrementalHandOff() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } final String baseSequenceName = "sequence0"; // as soon as any segment has more than one record, incremental publishing should happen maxRowsPerSegment = 2; @@ -608,9 +586,6 @@ public void testIncrementalHandOff() throws Exception @Test(timeout = 60_000L) public void testIncrementalHandOffMaxTotalRows() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } final String baseSequenceName = "sequence0"; // incremental publish should happen every 3 records maxRowsPerSegment = Integer.MAX_VALUE; @@ -763,9 +738,6 @@ public void testIncrementalHandOffMaxTotalRows() throws Exception @Test(timeout = 60_000L) public void testTimeBasedIncrementalHandOff() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } final String baseSequenceName = "sequence0"; // as soon as any segment hits maxRowsPerSegment or intermediateHandoffPeriod, incremental publishing should happen maxRowsPerSegment = Integer.MAX_VALUE; @@ -853,9 +825,6 @@ public void testTimeBasedIncrementalHandOff() throws Exception @Test(timeout = 60_000L) public void testIncrementalHandOffReadsThroughEndOffsets() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } records = generateSinglePartitionRecords(topic); final String baseSequenceName = "sequence0"; @@ -1675,9 +1644,7 @@ public void testRunOneTaskTwoPartitions() throws Exception // desc3 will not be created in KafkaIndexTask (0.12.x) as it does not create per Kafka partition Druid segments SegmentDescriptor desc3 = sd(task, "2011/P1D", 1); SegmentDescriptor desc4 = sd(task, "2012/P1D", 0); - Assert.assertEquals(isIncrementalHandoffSupported - ? ImmutableSet.of(desc1, desc2, desc4) - : ImmutableSet.of(desc1, desc2, desc3, desc4), publishedDescriptors()); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc4), publishedDescriptors()); Assert.assertEquals( new KafkaDataSourceMetadata( new SeekableStreamEndSequenceNumbers<>(topic, ImmutableMap.of(0, 5L, 1, 2L)) @@ -1691,12 +1658,8 @@ public void testRunOneTaskTwoPartitions() throws Exception // Check desc2/desc3 without strong ordering because two partitions are interleaved nondeterministically Assert.assertEquals( - isIncrementalHandoffSupported - ? ImmutableSet.of(ImmutableList.of("d", "e", "h")) - : ImmutableSet.of(ImmutableList.of("d", "e"), ImmutableList.of("h")), - isIncrementalHandoffSupported - ? ImmutableSet.of(readSegmentColumn("dim1", desc2)) - : ImmutableSet.of(readSegmentColumn("dim1", desc2), readSegmentColumn("dim1", desc3)) + ImmutableSet.of(ImmutableList.of("d", "e", "h")), + ImmutableSet.of(readSegmentColumn("dim1", desc2)) ); } @@ -1867,10 +1830,6 @@ public void testRestore() throws Exception @Test(timeout = 60_000L) public void testRestoreAfterPersistingSequences() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } - records = generateSinglePartitionRecords(topic); maxRowsPerSegment = 2; Map consumerProps = kafkaServer.consumerProperties(); @@ -2136,12 +2095,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva @Test(timeout = 60_000L) public void testRunContextSequenceAheadOfStartingOffsets() throws Exception { - // This tests the case when a replacement task is created in place of a failed test - // which has done some incremental handoffs, thus the context will contain starting - // sequence offsets from which the task should start reading and ignore the start offsets - if (!isIncrementalHandoffSupported) { - return; - } // Insert data insertData(); @@ -2344,9 +2297,6 @@ public void testRunTransactionModeRollback() throws Exception @Test(timeout = 60_000L) public void testCanStartFromLaterThanEarliestOffset() throws Exception { - if (!isIncrementalHandoffSupported) { - return; - } final String baseSequenceName = "sequence0"; maxRowsPerSegment = Integer.MAX_VALUE; maxTotalRows = null; @@ -2542,17 +2492,13 @@ private KafkaIndexTask createTask( maxParseExceptions, maxSavedParseExceptions ); - if (isIncrementalHandoffSupported) { - context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); - - if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { - final TreeMap> checkpoints = new TreeMap<>(); - checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); - final String checkpointsJson = OBJECT_MAPPER - .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF) - .writeValueAsString(checkpoints); - context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson); - } + if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { + final TreeMap> checkpoints = new TreeMap<>(); + checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); + final String checkpointsJson = OBJECT_MAPPER + .writerFor(KafkaSupervisor.CHECKPOINTS_TYPE_REF) + .writeValueAsString(checkpoints); + context.put(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY, checkpointsJson); } final KafkaIndexTask task = new KafkaIndexTask( @@ -2736,14 +2682,6 @@ public void close() final LocalDataSegmentPusherConfig dataSegmentPusherConfig = new LocalDataSegmentPusherConfig(); dataSegmentPusherConfig.storageDirectory = getSegmentDirectory(); final DataSegmentPusher dataSegmentPusher = new LocalDataSegmentPusher(dataSegmentPusherConfig); - SegmentLoaderConfig segmentLoaderConfig = new SegmentLoaderConfig() - { - @Override - public List getLocations() - { - return new ArrayList<>(); - } - }; toolboxFactory = new TaskToolboxFactory( taskConfig, taskActionClientFactory, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index a04556d6abf0..ef5c254a6c22 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2664,8 +2664,6 @@ private KinesisIndexTask createTask( ) throws JsonProcessingException { if (context != null) { - context.put(SeekableStreamSupervisor.IS_INCREMENTAL_HANDOFF_SUPPORTED, true); - if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); checkpoints.put(0, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 966b76cb6a52..755f63057886 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -121,7 +121,6 @@ */ public abstract class SeekableStreamSupervisor implements Supervisor { - public static final String IS_INCREMENTAL_HANDOFF_SUPPORTED = "IS_INCREMENTAL_HANDOFF_SUPPORTED"; public static final String CHECKPOINTS_CTX_KEY = "checkpoints"; private static final long MAX_RUN_FREQUENCY_MILLIS = 1000; @@ -2752,7 +2751,6 @@ public void checkpoint( protected Map createBaseTaskContexts() { final Map contexts = new HashMap<>(); - contexts.put(IS_INCREMENTAL_HANDOFF_SUPPORTED, true); if (spec.getContext() != null) { contexts.putAll(spec.getContext()); }