diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java index a88341b08339..76317d9ca5a2 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java @@ -19,6 +19,7 @@ package org.apache.druid.indexer; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.java.util.common.StringUtils; @@ -31,6 +32,8 @@ public class TaskIdUtils { private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); + private static final Joiner UNDERSCORE_JOINER = Joiner.on("_"); + public static void validateId(String thingToValidate, String stringToValidate) { Preconditions.checkArgument( @@ -60,4 +63,9 @@ public static String getRandomId() } return suffix.toString(); } + + public static String getRandomIdWithPrefix(String prefix) + { + return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId()); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 6c88c5a59c48..b2352fe40c49 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; @@ -46,6 +45,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -219,7 +219,7 @@ protected List> createIndexTasks( List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); + String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KafkaIndexTask( taskId, new TaskResource(baseSequenceName, 1), @@ -334,16 +334,38 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> partitions - ) + protected void updatePartitionLagFromStream() { - latestSequenceFromStream = partitions.stream() - .collect(Collectors.toMap( - StreamPartition::getPartitionId, - recordSupplier::getPosition - )); + getRecordSupplierLock().lock(); + try { + Set partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + Set> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + recordSupplier.seekToLatest(partitions); + + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed + latestSequenceFromStream = + partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition)); + } + catch (InterruptedException e) { + throw new StreamException(e); + } + finally { + getRecordSupplierLock().unlock(); + } } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 5d5a307e4fdc..bfd375830497 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; @@ -97,8 +98,12 @@ protected KinesisRecordSupplier newTaskRecordSupplier() KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig); int fetchThreads = tuningConfig.getFetchThreads() != null ? tuningConfig.getFetchThreads() - : Math.max(1, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + : Runtime.getRuntime().availableProcessors() * 2; + Preconditions.checkArgument( + fetchThreads > 0, + "Must have at least one background fetch thread for the record supplier" + ); return new KinesisRecordSupplier( KinesisRecordSupplier.getAmazonKinesisClient( ioConfig.getEndpoint(), @@ -114,7 +119,8 @@ protected KinesisRecordSupplier newTaskRecordSupplier() tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), tuningConfig.getFetchSequenceNumberTimeout(), - tuningConfig.getMaxRecordsPerPoll() + tuningConfig.getMaxRecordsPerPoll(), + false ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index f4b78304d17c..b32ee149f544 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -54,6 +54,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -82,7 +83,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -95,6 +98,14 @@ public class KinesisRecordSupplier implements RecordSupplier private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; private static final long EXCEPTION_RETRY_DELAY_MS = 10000; + /** + * We call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. + * In the case where the shard is constantly removing records that are past their retention period, it is possible + * that we never find the first record in the shard if we use a limit of 1. + */ + private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000; + private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10; + private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { final boolean isIOException = ex.getCause() instanceof IOException; @@ -102,6 +113,37 @@ private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) return isIOException || isTimeout; } + /** + * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing + * array itself. Does not modify position or limit of the buffer. + */ + private static byte[] toByteArray(final ByteBuffer buffer) + { + if (buffer.hasArray() + && buffer.arrayOffset() == 0 + && buffer.position() == 0 + && buffer.array().length == buffer.limit()) { + return buffer.array(); + } else { + final byte[] retVal = new byte[buffer.remaining()]; + buffer.duplicate().get(retVal); + return retVal; + } + } + + /** + * Catch any exception and wrap it in a {@link StreamException} + */ + private static T wrapExceptions(Callable callable) + { + try { + return callable.call(); + } + catch (Exception e) { + throw new StreamException(e); + } + } + private class PartitionResource { private final StreamPartition streamPartition; @@ -112,101 +154,80 @@ private class PartitionResource // to indicate that this shard has no more records to read @Nullable private volatile String shardIterator; - private volatile boolean started; - private volatile boolean stopRequested; - private volatile long currentLagMillis; - PartitionResource(StreamPartition streamPartition) + private final AtomicBoolean fetchStarted = new AtomicBoolean(); + private ScheduledFuture currentFetch; + + private PartitionResource(StreamPartition streamPartition) { this.streamPartition = streamPartition; } - void startBackgroundFetch() + private void startBackgroundFetch() { - if (started) { + if (!backgroundFetchEnabled) { return; } + // if seek has been called + if (shardIterator == null) { + log.warn( + "Skipping background fetch for stream[%s] partition[%s] since seek has not been called for this partition", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); + return; + } + if (fetchStarted.compareAndSet(false, true)) { + log.debug( + "Starting scheduled fetch for stream[%s] partition[%s]", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); - log.info( - "Starting scheduled fetch runnable for stream[%s] partition[%s]", - streamPartition.getStream(), - streamPartition.getPartitionId() - ); - - stopRequested = false; - started = true; - - rescheduleRunnable(fetchDelayMillis); - } - - void stopBackgroundFetch() - { - log.info( - "Stopping scheduled fetch runnable for stream[%s] partition[%s]", - streamPartition.getStream(), - streamPartition.getPartitionId() - ); - stopRequested = true; + scheduleBackgroundFetch(fetchDelayMillis); + } } - long getPartitionTimeLag() + private void stopBackgroundFetch() { - return currentLagMillis; + if (fetchStarted.compareAndSet(true, false)) { + log.debug( + "Stopping scheduled fetch for stream[%s] partition[%s]", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); + if (currentFetch != null && !currentFetch.isDone()) { + currentFetch.cancel(true); + } + } } - long getPartitionTimeLag(String offset) + private void scheduleBackgroundFetch(long delayMillis) { - // if not started (fetching records in background), fetch lag ourself with a throw-away iterator - if (!started) { + if (fetchStarted.get()) { try { - final String iteratorType; - final String offsetToUse; - if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) { - // this should probably check if will start processing earliest or latest rather than assuming earliest - // if latest we could skip this because latest will not be behind latest so lag is 0. - iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); - offsetToUse = null; - } else { - iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); - offsetToUse = offset; - } - String shardIterator = kinesis.getShardIterator( - streamPartition.getStream(), - streamPartition.getPartitionId(), - iteratorType, - offsetToUse - ).getShardIterator(); - - GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch) - ); - - currentLagMillis = recordsResult.getMillisBehindLatest(); - return currentLagMillis; + currentFetch = scheduledExec.schedule(fetchRecords(), delayMillis, TimeUnit.MILLISECONDS); } - catch (Exception ex) { - // eat it + catch (RejectedExecutionException e) { log.warn( - ex, - "Failed to determine partition lag for partition %s of stream %s", - streamPartition.getPartitionId(), - streamPartition.getStream() + e, + "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. " + + "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", + streamPartition.getPartitionId() ); + } + } else { + log.debug("Worker for partition[%s] is already stopped", streamPartition.getPartitionId()); } - return currentLagMillis; } - private Runnable getRecordRunnable() + private Runnable fetchRecords() { return () -> { - - if (stopRequested) { - started = false; - stopRequested = false; - - log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); + if (!fetchStarted.get()) { + log.debug("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); return; } @@ -231,7 +252,7 @@ private Runnable getRecordRunnable() if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) { log.warn("OrderedPartitionableRecord buffer full, retrying in [%,dms]", recordBufferFullWait); - rescheduleRunnable(recordBufferFullWait); + scheduleBackgroundFetch(recordBufferFullWait); } return; @@ -298,14 +319,14 @@ private Runnable getRecordRunnable() currRecord.getSequenceNumber() ).getShardIterator(); - rescheduleRunnable(recordBufferFullWait); + scheduleBackgroundFetch(recordBufferFullWait); return; } } shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed - rescheduleRunnable(fetchDelayMillis); + scheduleBackgroundFetch(fetchDelayMillis); } catch (ProvisionedThroughputExceededException e) { log.warn( @@ -315,7 +336,7 @@ private Runnable getRecordRunnable() + "the available throughput. Reduce the frequency or size of your requests." ); long retryMs = Math.max(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS, fetchDelayMillis); - rescheduleRunnable(retryMs); + scheduleBackgroundFetch(retryMs); } catch (InterruptedException e) { // may happen if interrupted while BlockingQueue.offer() is waiting @@ -324,7 +345,7 @@ private Runnable getRecordRunnable() "Interrupted while waiting to add record to buffer, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS ); - rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); + scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS); } catch (ExpiredIteratorException e) { log.warn( @@ -334,7 +355,7 @@ private Runnable getRecordRunnable() ); if (recordsResult != null) { shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed - rescheduleRunnable(fetchDelayMillis); + scheduleBackgroundFetch(fetchDelayMillis); } else { throw new ISE("can't reschedule fetch records runnable, recordsResult is null??"); } @@ -347,7 +368,7 @@ private Runnable getRecordRunnable() catch (AmazonServiceException e) { if (isServiceExceptionRecoverable(e)) { log.warn(e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS); - rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); + scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS); } else { log.warn(e, "encounted unknown unrecoverable AWS exception, will not retry"); throw new RuntimeException(e); @@ -355,31 +376,32 @@ private Runnable getRecordRunnable() } catch (Throwable e) { // non transient errors - log.error(e, "unknown getRecordRunnable exception, will not retry"); + log.error(e, "unknown fetchRecords exception, will not retry"); throw new RuntimeException(e); } }; } - private void rescheduleRunnable(long delayMillis) + private void seek(ShardIteratorType iteratorEnum, String sequenceNumber) { - if (started && !stopRequested) { - try { - scheduledExec.schedule(getRecordRunnable(), delayMillis, TimeUnit.MILLISECONDS); - } - catch (RejectedExecutionException e) { - log.warn( - e, - "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. " - + "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", - streamPartition.getPartitionId() - ); + log.debug( + "Seeking partition [%s] to [%s]", + streamPartition.getPartitionId(), + sequenceNumber != null ? sequenceNumber : iteratorEnum.toString() + ); - } - } else { - log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); - } + shardIterator = wrapExceptions(() -> kinesis.getShardIterator( + streamPartition.getStream(), + streamPartition.getPartitionId(), + iteratorEnum.toString(), + sequenceNumber + ).getShardIterator()); + } + + private long getPartitionTimeLag() + { + return currentLagMillis; } } @@ -398,6 +420,7 @@ private void rescheduleRunnable(long delayMillis) private final int maxRecordsPerPoll; private final int fetchThreads; private final int recordBufferSize; + private final boolean useEarliestSequenceNumber; private ScheduledExecutorService scheduledExec; @@ -405,8 +428,9 @@ private void rescheduleRunnable(long delayMillis) new ConcurrentHashMap<>(); private BlockingQueue> records; - private volatile boolean checkPartitionsStarted = false; + private final boolean backgroundFetchEnabled; private volatile boolean closed = false; + private AtomicBoolean partitionsFetchStarted = new AtomicBoolean(); public KinesisRecordSupplier( AmazonKinesis amazonKinesis, @@ -418,7 +442,8 @@ public KinesisRecordSupplier( int recordBufferOfferTimeout, int recordBufferFullWait, int fetchSequenceNumberTimeout, - int maxRecordsPerPoll + int maxRecordsPerPoll, + boolean useEarliestSequenceNumber ) { Preconditions.checkNotNull(amazonKinesis); @@ -432,6 +457,8 @@ public KinesisRecordSupplier( this.maxRecordsPerPoll = maxRecordsPerPoll; this.fetchThreads = fetchThreads; this.recordBufferSize = recordBufferSize; + this.useEarliestSequenceNumber = useEarliestSequenceNumber; + this.backgroundFetchEnabled = fetchThreads > 0; // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache. // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the @@ -459,16 +486,18 @@ public KinesisRecordSupplier( getDataHandle = null; } - log.info( - "Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", - fetchThreads, - Runtime.getRuntime().availableProcessors() - ); + if (backgroundFetchEnabled) { + log.info( + "Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", + fetchThreads, + Runtime.getRuntime().availableProcessors() + ); - scheduledExec = Executors.newScheduledThreadPool( - fetchThreads, - Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") - ); + scheduledExec = Executors.newScheduledThreadPool( + fetchThreads, + Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") + ); + } records = new LinkedBlockingQueue<>(recordBufferSize); } @@ -517,12 +546,35 @@ public static AmazonKinesis getAmazonKinesisClient( public void start() { checkIfClosed(); - if (checkPartitionsStarted) { + if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(false, true)) { partitionResources.values().forEach(PartitionResource::startBackgroundFetch); - checkPartitionsStarted = false; } } + @Override + public void close() + { + if (this.closed) { + return; + } + + assign(ImmutableSet.of()); + + scheduledExec.shutdown(); + + try { + if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { + scheduledExec.shutdownNow(); + } + } + catch (InterruptedException e) { + log.warn(e, "InterruptedException while shutting down"); + throw new RuntimeException(e); + } + + this.closed = true; + } + @Override public void assign(Set> collection) { @@ -535,56 +587,55 @@ public void assign(Set> collection) ) ); - for (Iterator, PartitionResource>> i = partitionResources.entrySet() - .iterator(); i.hasNext(); ) { + Iterator, PartitionResource>> i = partitionResources.entrySet().iterator(); + while (i.hasNext()) { Map.Entry, PartitionResource> entry = i.next(); if (!collection.contains(entry.getKey())) { i.remove(); entry.getValue().stopBackgroundFetch(); } } + } + @Override + public Collection> getAssignment() + { + return partitionResources.keySet(); } @Override public void seek(StreamPartition partition, String sequenceNumber) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(ImmutableSet.of(partition)); - seekInternal(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER); + filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition)); + partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER); } @Override public void seekToEarliest(Set> partitions) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(partitions); - partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.TRIM_HORIZON)); + filterBufferAndResetBackgroundFetch(partitions); + partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.TRIM_HORIZON)); } @Override public void seekToLatest(Set> partitions) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(partitions); - partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.LATEST)); + filterBufferAndResetBackgroundFetch(partitions); + partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.LATEST)); } + @Nullable @Override - public Collection> getAssignment() + public String getPosition(StreamPartition partition) { - return partitionResources.keySet(); + throw new UnsupportedOperationException("getPosition() is not supported in Kinesis"); } @Nonnull @Override public List> poll(long timeout) { - checkIfClosed(); - if (checkPartitionsStarted) { - partitionResources.values().forEach(PartitionResource::startBackgroundFetch); - checkPartitionsStarted = false; - } + start(); try { int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll); @@ -616,23 +667,14 @@ public List> poll(long timeout) @Override public String getLatestSequenceNumber(StreamPartition partition) { - checkIfClosed(); - return getSequenceNumberInternal(partition, ShardIteratorType.LATEST); + return getSequenceNumber(partition, ShardIteratorType.LATEST); } @Nullable @Override public String getEarliestSequenceNumber(StreamPartition partition) { - checkIfClosed(); - return getSequenceNumberInternal(partition, ShardIteratorType.TRIM_HORIZON); - } - - @Nullable - @Override - public String getPosition(StreamPartition partition) - { - throw new UnsupportedOperationException("getPosition() is not supported in Kinesis"); + return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON); } @Override @@ -665,180 +707,179 @@ public Set getPartitionIds(String stream) ); } - @Override - public void close() + /** + * Fetch the partition lag, given a stream and set of current partition offsets. This operates independently from + * the {@link PartitionResource} which have been assigned to this record supplier. + */ + public Map getPartitionsTimeLag(String stream, Map currentOffsets) { - if (this.closed) { - return; - } - - assign(ImmutableSet.of()); - - scheduledExec.shutdown(); - - try { - if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { - scheduledExec.shutdownNow(); - } - } - catch (InterruptedException e) { - log.warn(e, "InterruptedException while shutting down"); - throw new RuntimeException(e); + Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + for (Map.Entry partitionOffset : currentOffsets.entrySet()) { + StreamPartition partition = new StreamPartition<>(stream, partitionOffset.getKey()); + long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue()); + partitionLag.put(partitionOffset.getKey(), currentLag); } - - this.closed = true; + return partitionLag; } - // this is only used for tests + /** + * This method is only used for tests to verify that {@link PartitionResource} in fact tracks it's current lag + * as it is polled for records. This isn't currently used in production at all, but could be one day if we were + * to prefer to get the lag from the running tasks in the same API call which fetches the current task offsets, + * instead of directly calling the AWS Kinesis API with the offsets returned from those tasks + * (see {@link #getPartitionsTimeLag}, which accepts a map of current partition offsets). + */ @VisibleForTesting - Map getPartitionTimeLag() + Map getPartitionResourcesTimeLag() { return partitionResources.entrySet() .stream() .collect( - Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag()) + Collectors.toMap( + k -> k.getKey().getPartitionId(), + k -> k.getValue().getPartitionTimeLag() + ) ); } - public Map getPartitionTimeLag(Map currentOffsets) + @VisibleForTesting + public int bufferSize() { - Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); - for (Map.Entry, PartitionResource> partition : partitionResources.entrySet()) { - final String partitionId = partition.getKey().getPartitionId(); - partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId))); - } - return partitionLag; + return records.size(); + } + + @VisibleForTesting + public boolean isBackgroundFetchRunning() + { + return partitionsFetchStarted.get(); } - private void seekInternal(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) + /** + * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call + * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background + * fetch, which should have been stopped prior to calling this method by a call to + * {@link #filterBufferAndResetBackgroundFetch}. + */ + private void partitionSeek(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) { PartitionResource resource = partitionResources.get(partition); if (resource == null) { throw new ISE("Partition [%s] has not been assigned", partition); } - - log.debug( - "Seeking partition [%s] to [%s]", - partition.getPartitionId(), - sequenceNumber != null ? sequenceNumber : iteratorEnum.toString() - ); - - resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator( - partition.getStream(), - partition.getPartitionId(), - iteratorEnum.toString(), - sequenceNumber - ).getShardIterator()); - - checkPartitionsStarted = true; + resource.seek(iteratorEnum, sequenceNumber); } - private void filterBufferAndResetFetchRunnable(Set> partitions) throws InterruptedException + /** + * Given a partition and a {@link ShardIteratorType}, create a shard iterator and fetch + * {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set. + * This method is thread safe as it does not depend on the internal state of the supplier (it doesn't use the + * {@link PartitionResource} which have been assigned to the supplier), and the Kinesis client is thread safe. + */ + @Nullable + private String getSequenceNumber(StreamPartition partition, ShardIteratorType iteratorEnum) { - scheduledExec.shutdown(); - - try { - if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { - scheduledExec.shutdownNow(); - } - } - catch (InterruptedException e) { - log.warn(e, "InterruptedException while shutting down"); - throw e; - } + return wrapExceptions(() -> { + String shardIterator = + kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString()) + .getShardIterator(); + long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout; + GetRecordsResult recordsResult = null; + + while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) { + + if (closed) { + log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); + return null; + } + final String currentShardIterator = shardIterator; + final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator) + .withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT); + recordsResult = RetryUtils.retry( + () -> kinesis.getRecords(request), + (throwable) -> { + if (throwable instanceof ProvisionedThroughputExceededException) { + log.warn( + throwable, + "encountered ProvisionedThroughputExceededException while fetching records, this means " + + "that the request rate for the stream is too high, or the requested data is too large for " + + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " + + "the number of shards to increase throughput." + ); + return true; + } + return false; + }, + GET_SEQUENCE_NUMBER_RETRY_COUNT + ); - scheduledExec = Executors.newScheduledThreadPool( - fetchThreads, - Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") - ); + List records = recordsResult.getRecords(); - // filter records in buffer and only retain ones whose partition was not seeked - BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); + if (!records.isEmpty()) { + return records.get(0).getSequenceNumber(); + } - records.stream() - .filter(x -> !partitions.contains(x.getStreamPartition())) - .forEachOrdered(newQ::offer); + shardIterator = recordsResult.getNextShardIterator(); + } - records = newQ; + if (shardIterator == null) { + log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId()); + return KinesisSequenceNumber.END_OF_SHARD_MARKER; + } - // restart fetching threads - partitionResources.values().forEach(x -> x.started = false); - checkPartitionsStarted = true; - } - @Nullable - private String getSequenceNumberInternal(StreamPartition partition, ShardIteratorType iteratorEnum) - { - return wrapExceptions(() -> getSequenceNumberInternal( - partition, - kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString()) - .getShardIterator() - )); + // if we reach here, it usually means either the shard has no more records, or records have not been + // added to this shard + log.warn( + "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", + partition.getPartitionId(), + recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" + ); + return null; + }); } - @Nullable - private String getSequenceNumberInternal(StreamPartition partition, String shardIterator) + /** + * Given a {@link StreamPartition} and an offset, create a 'shard iterator' for the offset and fetch a single record + * in order to get the lag: {@link GetRecordsResult#getMillisBehindLatest()}. This method is thread safe as it does + * not depend on the internal state of the supplier (it doesn't use the {@link PartitionResource} which have been + * assigned to the supplier), and the Kinesis client is thread safe. + */ + private Long getPartitionTimeLag(StreamPartition partition, String offset) { - long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout; - GetRecordsResult recordsResult = null; - - while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) { - - if (closed) { - log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); - return null; - } - try { - // we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. - // In the case where the shard is constantly removing records that are past their retention period, it is possible - // that we never find the first record in the shard if we use a limit of 1. - recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000)); - } - catch (ProvisionedThroughputExceededException e) { - log.warn( - e, - "encountered ProvisionedThroughputExceededException while fetching records, this means " - + "that the request rate for the stream is too high, or the requested data is too large for " - + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " - + "the number of shards to increase throughput." - ); - try { - Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS); - continue; - } - catch (InterruptedException e1) { - log.warn(e1, "Thread interrupted!"); - Thread.currentThread().interrupt(); - break; + return wrapExceptions(() -> { + final String iteratorType; + final String offsetToUse; + if (offset == null || KinesisSupervisor.OFFSET_NOT_SET.equals(offset)) { + if (useEarliestSequenceNumber) { + iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + offsetToUse = null; + } else { + // if offset is not set and not using earliest, it means we will start reading from latest, + // so lag will be 0 and we have nothing to do here + return 0L; } + } else { + iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); + offsetToUse = offset; } + String shardIterator = kinesis.getShardIterator( + partition.getStream(), + partition.getPartitionId(), + iteratorType, + offsetToUse + ).getShardIterator(); + + GetRecordsResult recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) + ); - List records = recordsResult.getRecords(); - - if (!records.isEmpty()) { - return records.get(0).getSequenceNumber(); - } - - shardIterator = recordsResult.getNextShardIterator(); - } - - if (shardIterator == null) { - log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId()); - return KinesisSequenceNumber.END_OF_SHARD_MARKER; - } - - - // if we reach here, it usually means either the shard has no more records, or records have not been - // added to this shard - log.warn( - "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", - partition.getPartitionId(), - recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" - ); - return null; - + return recordsResult.getMillisBehindLatest(); + }); } + /** + * Explode if {@link #close()} has been called on the supplier. + */ private void checkIfClosed() { if (closed) { @@ -847,36 +888,46 @@ private void checkIfClosed() } /** - * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing - * array itself. Does not modify position or limit of the buffer. + * This method must be called before a seek operation ({@link #seek}, {@link #seekToLatest}, or + * {@link #seekToEarliest}). + * + * When called, it will nuke the {@link #scheduledExec} that is shared by all {@link PartitionResource}, filters + * records from the buffer for partitions which will have a seek operation performed, and stops background fetch for + * each {@link PartitionResource} to prepare for the seek. If background fetch is not currently running, the + * {@link #scheduledExec} will not be re-created. */ - private static byte[] toByteArray(final ByteBuffer buffer) + private void filterBufferAndResetBackgroundFetch(Set> partitions) throws InterruptedException { - if (buffer.hasArray() - && buffer.arrayOffset() == 0 - && buffer.position() == 0 - && buffer.array().length == buffer.limit()) { - return buffer.array(); - } else { - final byte[] retVal = new byte[buffer.remaining()]; - buffer.duplicate().get(retVal); - return retVal; - } - } + checkIfClosed(); + if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(true, false)) { + scheduledExec.shutdown(); - private static T wrapExceptions(Callable callable) - { - try { - return callable.call(); - } - catch (Exception e) { - throw new StreamException(e); + try { + if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { + scheduledExec.shutdownNow(); + } + } + catch (InterruptedException e) { + log.warn(e, "InterruptedException while shutting down"); + throw e; + } + + scheduledExec = Executors.newScheduledThreadPool( + fetchThreads, + Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") + ); } - } - @VisibleForTesting - public int bufferSize() - { - return records.size(); + // filter records in buffer and only retain ones whose partition was not seeked + BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); + + records.stream() + .filter(x -> !partitions.contains(x.getStreamPartition())) + .forEachOrdered(newQ::offer); + + records = newQ; + + // restart fetching threads + partitionResources.values().forEach(x -> x.stopBackgroundFetch()); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index 664b3b58b32d..401efe72c105 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -71,7 +71,8 @@ protected KinesisRecordSupplier createRecordSupplier() tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), tuningConfig.getFetchSequenceNumberTimeout(), - tuningConfig.getMaxRecordsPerPoll() + tuningConfig.getMaxRecordsPerPoll(), + ioConfig.isUseEarliestSequenceNumber() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java index 4bd8fe8d4872..ab13f5bda05c 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java @@ -36,27 +36,33 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber */ public static final String END_OF_SHARD_MARKER = "EOS"; - // this special marker is used by the KinesisSupervisor to set the endOffsets - // of newly created indexing tasks. This is necessary because streaming tasks do not - // have endPartitionOffsets. This marker signals to the task that it should continue - // to ingest data until taskDuration has elapsed or the task was stopped or paused or killed + + /** + * This special marker is used by the KinesisSupervisor to set the endOffsets of newly created indexing tasks. This + * is necessary because streaming tasks do not have endPartitionOffsets. This marker signals to the task that it + * should continue to ingest data until taskDuration has elapsed or the task was stopped or paused or killed. + */ public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER"; - // this special marker is used by the KinesisSupervisor to mark that a shard has been expired - // (i.e., closed and then the retention period has passed) + + /** + * This special marker is used by the KinesisSupervisor to mark that a shard has been expired + * (i.e., closed and then the retention period has passed) + */ public static final String EXPIRED_MARKER = "EXPIRED"; - // this flag is used to indicate either END_OF_SHARD_MARKER - // or NO_END_SEQUENCE_NUMBER so that they can be properly compared - // with other sequence numbers + /** + * this flag is used to indicate either END_OF_SHARD_MARKER + * or NO_END_SEQUENCE_NUMBER so that they can be properly compared + * with other sequence numbers + */ private final boolean isMaxSequenceNumber; private final BigInteger intSequence; private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive) { super(sequenceNumber, isExclusive); - if (END_OF_SHARD_MARKER.equals(sequenceNumber) - || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) { + if (END_OF_SHARD_MARKER.equals(sequenceNumber) || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) { isMaxSequenceNumber = true; this.intSequence = null; } else { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 0000ee693e3e..494e901dad7e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.indexer.TaskIdUtils; @@ -49,7 +48,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; @@ -64,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; /** * Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a @@ -82,7 +81,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor { }; - public static final String NOT_SET = "-1"; + public static final String OFFSET_NOT_SET = "-1"; private final KinesisSupervisorSpec spec; private final AWSCredentialsConfig awsCredentialsConfig; private volatile Map currentPartitionTimeLag; @@ -167,7 +166,7 @@ protected List> createIndexTasks( List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); + String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KinesisIndexTask( taskId, new TaskResource(baseSequenceName, 1), @@ -187,8 +186,7 @@ protected List> createIndexTasks( @Override - protected RecordSupplier setupRecordSupplier() - throws RuntimeException + protected RecordSupplier setupRecordSupplier() throws RuntimeException { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); @@ -202,13 +200,14 @@ protected RecordSupplier setupRecordSupplier() ), ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), - 1, + 0, // skip starting background fetch, it is not used ioConfig.isDeaggregate(), taskTuningConfig.getRecordBufferSize(), taskTuningConfig.getRecordBufferOfferTimeout(), taskTuningConfig.getRecordBufferFullWait(), taskTuningConfig.getFetchSequenceNumberTimeout(), - taskTuningConfig.getMaxRecordsPerPoll() + taskTuningConfig.getMaxRecordsPerPoll(), + ioConfig.isUseEarliestSequenceNumber() ); } @@ -296,7 +295,19 @@ protected Map getRecordLagPerPartition(Map current @Override protected Map getTimeLagPerPartition(Map currentOffsets) { - return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets); + return currentOffsets + .entrySet() + .stream() + .filter(e -> e.getValue() != null && + currentPartitionTimeLag != null && + currentPartitionTimeLag.get(e.getKey()) != null + ) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> currentPartitionTimeLag.get(e.getKey()) + ) + ); } @Override @@ -315,13 +326,11 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> streamPartitions - ) + protected void updatePartitionLagFromStream() { KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; - currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + // this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock + currentPartitionTimeLag = supplier.getPartitionsTimeLag(getIoConfig().getStream(), getHighestCurrentOffsets()); } @Override @@ -345,7 +354,7 @@ protected String baseTaskName() @Override protected String getNotSetMarker() { - return NOT_SET; + return OFFSET_NOT_SET; } @Override @@ -454,7 +463,7 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat } } - newSequences = new SeekableStreamStartSequenceNumbers( + newSequences = new SeekableStreamStartSequenceNumbers<>( old.getStream(), null, newPartitionSequenceNumberMap, @@ -462,7 +471,7 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat newExclusiveStartPartitions ); } else { - newSequences = new SeekableStreamEndSequenceNumbers( + newSequences = new SeekableStreamEndSequenceNumbers<>( old.getStream(), null, newPartitionSequenceNumberMap, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index c886065955d1..4daa2fdc5d9c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2034,7 +2034,7 @@ public void testRestoreAfterPersistingSequences() throws Exception .andReturn(Collections.emptyList()) .anyTimes(); - EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(recordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(null) .anyTimes(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 4a631e84fa66..6b2f32f6cdda 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -203,7 +203,8 @@ public void testSupplierSetup() 5000, 5000, 60000, - 5 + 5, + true ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -212,6 +213,9 @@ public void testSupplierSetup() Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertEquals(ImmutableSet.of(SHARD_ID1, SHARD_ID0), recordSupplier.getPartitionIds(STREAM)); + + // calling poll would start background fetch if seek was called, but will instead be skipped and the results + // empty Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100)); verifyAll(); @@ -290,7 +294,8 @@ public void testPoll() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -308,7 +313,7 @@ public void testPoll() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @Test @@ -367,7 +372,8 @@ public void testSeek() 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -386,7 +392,7 @@ public void testSeek() Assert.assertEquals(9, polledRecords.size()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12))); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2))); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @@ -434,7 +440,8 @@ public void testSeekToLatest() 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -468,7 +475,8 @@ public void testSeekUnassigned() throws InterruptedException 5000, 5000, 60000, - 5 + 5, + true ); recordSupplier.assign(partitions); @@ -530,7 +538,8 @@ public void testPollAfterSeek() 5000, 5000, 60000, - 1 + 1, + true ); recordSupplier.assign(partitions); @@ -549,7 +558,7 @@ public void testPollAfterSeek() ); // only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag()); recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7"); recordSupplier.start(); @@ -563,7 +572,7 @@ public void testPollAfterSeek() Assert.assertEquals(ALL_RECORDS.get(9), record2); // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag()); verifyAll(); } @@ -622,7 +631,8 @@ public void testPollDeaggregate() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -640,7 +650,7 @@ public void testPollDeaggregate() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @Test @@ -692,7 +702,8 @@ private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNull 5000, 5000, 1000, - 100 + 100, + true ); return recordSupplier; } @@ -705,16 +716,14 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.eq(SHARD_ID0), EasyMock.anyString(), EasyMock.anyString() - )).andReturn( - getShardIteratorResult0).anyTimes(); + )).andReturn(getShardIteratorResult0).anyTimes(); EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID1), EasyMock.anyString(), EasyMock.anyString() - )).andReturn( - getShardIteratorResult1).anyTimes(); + )).andReturn(getShardIteratorResult1).anyTimes(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); @@ -728,8 +737,8 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2); replayAll(); @@ -738,7 +747,6 @@ public void getPartitionTimeLag() throws InterruptedException StreamPartition.of(STREAM, SHARD_ID1) ); - recordSupplier = new KinesisRecordSupplier( kinesis, recordsPerFetch, @@ -749,7 +757,8 @@ public void getPartitionTimeLag() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -760,16 +769,19 @@ public void getPartitionTimeLag() throws InterruptedException Thread.sleep(100); } + Map timeLag = recordSupplier.getPartitionResourcesTimeLag(); + + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); + Map offsts = ImmutableMap.of( SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() ); - Map timeLag = recordSupplier.getPartitionTimeLag(offsts); - + Map independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsts); + Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag); verifyAll(); - - Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); } /** diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 816e0aa9321c..f3a21d9b2278 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -129,7 +129,6 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final StreamPartition SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); private static final StreamPartition SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); private static final StreamPartition SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); - private static final Map TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); private static DataSchema dataSchema; private KinesisRecordSupplier supervisorRecordSupplier; @@ -232,9 +231,6 @@ public void testNoInitialState() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -284,6 +280,68 @@ public void testNoInitialState() throws Exception ); } + @Test + public void testRecordSupplier() + { + KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + false + ); + KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); + KinesisSupervisor supervisor = new KinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + clientFactory, + OBJECT_MAPPER, + new KinesisSupervisorSpec( + null, + dataSchema, + tuningConfig, + kinesisSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + clientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null, + new SupervisorStateManagerConfig() + ), + rowIngestionMetersFactory, + null + ); + + KinesisRecordSupplier supplier = (KinesisRecordSupplier) supervisor.setupRecordSupplier(); + Assert.assertNotNull(supplier); + Assert.assertEquals(0, supplier.bufferSize()); + Assert.assertEquals(Collections.emptySet(), supplier.getAssignment()); + // background fetch should not be enabled for supervisor supplier + supplier.start(); + Assert.assertFalse(supplier.isBackgroundFetchRunning()); + } + @Test public void testMultiTask() throws Exception { @@ -302,9 +360,6 @@ public void testMultiTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -364,9 +419,6 @@ public void testReplicas() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -444,9 +496,6 @@ public void testLateMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -499,9 +548,6 @@ public void testEarlyMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -561,9 +607,6 @@ public void testDatasourceMetadata() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -661,9 +704,6 @@ public void testDontKillTasksWithMismatchedType() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); // non KinesisIndexTask (don't kill) Task id2 = new RealtimeIndexTask( @@ -730,9 +770,6 @@ public void testKillBadPartitionAssignment() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -845,9 +882,6 @@ public void testRequeueTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -959,9 +993,6 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); DateTime now = DateTimes.nowUtc(); DateTime maxi = now.plusMinutes(60); @@ -1099,9 +1130,6 @@ public void testQueueNextTasksOnSuccess() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1228,9 +1256,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); final Capture firstTasks = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1361,7 +1386,7 @@ public void testDiscoverExistingPublishingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); @@ -1453,9 +1478,6 @@ public void testDiscoverExistingPublishingTask() throws Exception Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); Assert.assertEquals("id1", publishingReport.getId()); @@ -1525,7 +1547,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); @@ -1605,9 +1627,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); @@ -1681,7 +1700,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); Task id1 = createKinesisIndexTask( @@ -1859,9 +1878,6 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1944,9 +1960,6 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2056,10 +2069,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); - Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -2207,9 +2216,6 @@ public void testStopGracefully() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2618,9 +2624,6 @@ public void testResetRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2847,9 +2850,6 @@ public void testNoDataIngestionTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -3005,9 +3005,6 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3261,9 +3258,6 @@ public void testSuspendedRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -3482,7 +3476,8 @@ public void testGetCurrentTotalStats() } @Test - public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExistsException + public void testDoNotKillCompatibleTasks() + throws InterruptedException, EntryExistsException { // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks int numReplicas = 2; @@ -3512,9 +3507,6 @@ public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExi EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id2", @@ -3582,7 +3574,8 @@ public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExi } @Test - public void testKillIncompatibleTasks() throws InterruptedException, EntryExistsException + public void testKillIncompatibleTasks() + throws InterruptedException, EntryExistsException { // This supervisor always returns false for isTaskCurrent -> it should kill its tasks int numReplicas = 2; @@ -3611,9 +3604,6 @@ public void testKillIncompatibleTasks() throws InterruptedException, EntryExists EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -3856,9 +3846,6 @@ private List testShardSplitPhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3967,9 +3954,6 @@ private List testShardSplitPhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4145,9 +4129,6 @@ private void testShardSplitPhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4311,9 +4292,6 @@ private List testShardMergePhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -4433,9 +4411,6 @@ private List testShardMergePhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postMergeCaptured = Capture.newInstance(CaptureType.ALL); @@ -4590,9 +4565,6 @@ private void testShardMergePhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 247ff0e00c10..cc4b986bd1a7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -37,6 +37,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -108,6 +110,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -479,7 +482,7 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) private final BlockingQueue notices = new LinkedBlockingDeque<>(); private final Object stopLock = new Object(); private final Object stateChangeLock = new Object(); - private final Object recordSupplierLock = new Object(); + private final ReentrantLock recordSupplierLock = new ReentrantLock(); private final boolean useExclusiveStartingSequence; private boolean listenerRegistered = false; @@ -706,6 +709,11 @@ public void reset(DataSourceMetadata dataSourceMetadata) notices.add(new ResetNotice(dataSourceMetadata)); } + public ReentrantLock getRecordSupplierLock() + { + return recordSupplierLock; + } + @VisibleForTesting public void tryInit() @@ -1889,10 +1897,9 @@ private boolean updatePartitionDataFromStream() { List previousPartitionIds = new ArrayList<>(partitionIds); Set partitionIdsFromSupplier; + recordSupplierLock.lock(); try { - synchronized (recordSupplierLock) { - partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); - } + partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); } catch (Exception e) { stateManager.recordThrowableEvent(e); @@ -1900,6 +1907,9 @@ private boolean updatePartitionDataFromStream() log.debug(e, "full stack trace"); return false; } + finally { + recordSupplierLock.unlock(); + } if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 0) { String errMsg = StringUtils.format("No partitions found for stream [%s]", ioConfig.getStream()); @@ -1989,6 +1999,7 @@ private boolean updatePartitionDataFromStream() ); } + Int2ObjectMap> newlyDiscovered = new Int2ObjectLinkedOpenHashMap<>(); for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) { int taskGroupId = getTaskGroupIdForPartition(partitionId); Set partitionGroup = partitionGroups.computeIfAbsent( @@ -1998,16 +2009,30 @@ private boolean updatePartitionDataFromStream() partitionGroup.add(partitionId); if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == null) { - log.info( + log.debug( "New partition [%s] discovered for stream [%s], added to task group [%d]", partitionId, ioConfig.getStream(), taskGroupId ); + + newlyDiscovered.computeIfAbsent(taskGroupId, ArrayList::new).add(partitionId); + } + } + + if (newlyDiscovered.size() > 0) { + for (Int2ObjectMap.Entry> taskGroupPartitions : newlyDiscovered.int2ObjectEntrySet()) { + log.info( + "New partitions %s discovered for stream [%s], added to task group [%s]", + taskGroupPartitions.getValue(), + ioConfig.getStream(), + taskGroupPartitions.getIntKey() + ); } } if (!partitionIds.equals(previousPartitionIds)) { + assignRecordSupplierToPartitionIds(); // the set of partition IDs has changed, have any running tasks stop early so that we can adjust to the // repartitioning quickly by creating new tasks for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { @@ -2034,6 +2059,28 @@ private boolean updatePartitionDataFromStream() return true; } + private void assignRecordSupplierToPartitionIds() + { + recordSupplierLock.lock(); + try { + final Set partitions = partitionIds.stream() + .map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId)) + .collect(Collectors.toSet()); + if (!recordSupplier.getAssignment().containsAll(partitions)) { + recordSupplier.assign(partitions); + try { + recordSupplier.seekToEarliest(partitions); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + finally { + recordSupplierLock.unlock(); + } + } + /** * This method determines the set of expired partitions from the set of partitions currently returned by * the record supplier and the set of partitions previously tracked in the metadata. @@ -2106,6 +2153,7 @@ private void cleanupClosedAndExpiredPartitions( partitionIds.clear(); partitionIds.addAll(activePartitionsIdsFromSupplier); + assignRecordSupplierToPartitionIds(); for (Integer groupId : partitionGroups.keySet()) { if (newPartitionGroups.containsKey(groupId)) { @@ -2773,8 +2821,7 @@ protected Map> filter return startingOffsets; } - private void createNewTasks() - throws JsonProcessingException + private void createNewTasks() throws JsonProcessingException { // update the checkpoints in the taskGroup to latest ones so that new tasks do not read what is already published verifyAndMergeCheckpoints( @@ -2993,7 +3040,7 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti if (sequence == null) { throw new ISE("unable to fetch sequence number for partition[%s] from stream", partition); } - log.info("Getting sequence number [%s] for partition [%s]", sequence, partition); + log.debug("Getting sequence number [%s] for partition [%s]", sequence, partition); return makeSequenceNumber(sequence, false); } } @@ -3023,26 +3070,27 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Fetches the earliest or latest offset from the stream via the {@link RecordSupplier} + */ @Nullable private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partition, boolean useEarliestOffset) { - synchronized (recordSupplierLock) { + recordSupplierLock.lock(); + try { StreamPartition topicPartition = new StreamPartition<>(ioConfig.getStream(), partition); if (!recordSupplier.getAssignment().contains(topicPartition)) { - final Set partitions = Collections.singleton(topicPartition); - recordSupplier.assign(partitions); - try { - recordSupplier.seekToEarliest(partitions); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } + // this shouldn't happen, but in case it does... + throw new IllegalStateException("Record supplier does not match current known partitions"); } return useEarliestOffset ? recordSupplier.getEarliestSequenceNumber(topicPartition) : recordSupplier.getLatestSequenceNumber(topicPartition); } + finally { + recordSupplierLock.unlock(); + } } private void createTasksForGroup(int groupId, int replicas) @@ -3098,16 +3146,24 @@ private void createTasksForGroup(int groupId, int replicas) } } + /** + * monitoring method, fetches current partition offsets and lag in a background reporting thread + */ @VisibleForTesting public void updateCurrentAndLatestOffsets() { - try { - updateCurrentOffsets(); - updateLatestOffsetsFromStream(); - sequenceLastUpdated = DateTimes.nowUtc(); - } - catch (Exception e) { - log.warn(e, "Exception while getting current/latest sequences"); + // if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later, but if we aren't + // healthy go ahead and try anyway to try if possible to provide insight into how much time is left to fix the + // issue for cluster operators since this feeds the lag metrics + if (stateManager.isSteadyState() || !stateManager.isHealthy()) { + try { + updateCurrentOffsets(); + updatePartitionLagFromStream(); + sequenceLastUpdated = DateTimes.nowUtc(); + } + catch (Exception e) { + log.warn(e, "Exception while getting current/latest sequences"); + } } } @@ -3136,34 +3192,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); } - private void updateLatestOffsetsFromStream() throws InterruptedException - { - synchronized (recordSupplierLock) { - Set partitionIds; - try { - partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream()); - } - catch (Exception e) { - log.warn("Could not fetch partitions for topic/stream [%s]", ioConfig.getStream()); - throw new StreamException(e); - } - - Set> partitions = partitionIds - .stream() - .map(e -> new StreamPartition<>(ioConfig.getStream(), e)) - .collect(Collectors.toSet()); - - recordSupplier.assign(partitions); - recordSupplier.seekToLatest(partitions); - - updateLatestSequenceFromStream(recordSupplier, partitions); - } - } - - protected abstract void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> partitions - ); + protected abstract void updatePartitionLagFromStream(); /** * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. @@ -3179,17 +3208,21 @@ protected abstract void updateLatestSequenceFromStream( protected Map getHighestCurrentOffsets() { - if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { - return activelyReadingTaskGroups - .values() - .stream() - .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) - .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) - .collect(Collectors.toMap( - Entry::getKey, - Entry::getValue, - (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 - )); + if (!spec.isSuspended()) { + if (activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { + return activelyReadingTaskGroups + .values() + .stream() + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) + .collect(Collectors.toMap( + Entry::getKey, + Entry::getValue, + (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 + )); + } + // nothing is running but we are not suspended, so lets just hang out in case we get called while things start up + return ImmutableMap.of(); } else { // if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist return getOffsetsFromMetadataStorage(); @@ -3447,8 +3480,9 @@ private boolean checkOffsetAvailability( protected void emitLag() { - if (spec.isSuspended()) { - // don't emit metrics if supervisor is suspended (lag should still available in status report) + if (spec.isSuspended() || !stateManager.isSteadyState()) { + // don't emit metrics if supervisor is suspended or not in a healthy running state + // (lag should still available in status report) return; } try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 0d0ae0657a8c..54aa3beb6920 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -753,7 +753,6 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept { spec = createMock(SeekableStreamSupervisorSpec.class); EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( "stream", @@ -766,13 +765,20 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept false, new Period("PT30M"), null, - null, null + null, + null ) { }).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); - EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { + @Override + public Duration getEmitterPeriod() + { + return new Period("PT1S").toStandardDuration(); + } + }).anyTimes(); EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes(); EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); @@ -986,9 +992,7 @@ protected String baseTaskName() } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, Set> streamPartitions - ) + protected void updatePartitionLagFromStream() { // do nothing } @@ -1219,7 +1223,9 @@ protected Map getPartitionTimeLag() protected void emitLag() { super.emitLag(); - latch.countDown(); + if (stateManager.isSteadyState()) { + latch.countDown(); + } } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index 76cf8c60927e..406d5d27b8f2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -92,7 +92,7 @@ public boolean isFirstRunOnly() private final Deque recentEventsQueue = new ConcurrentLinkedDeque<>(); - private State supervisorState = BasicState.PENDING; + private volatile State supervisorState = BasicState.PENDING; private boolean atLeastOneSuccessfulRun = false; private boolean currentRunSuccessful = true; @@ -214,6 +214,11 @@ public boolean isHealthy() return supervisorState != null && supervisorState.isHealthy(); } + public boolean isSteadyState() + { + return healthySteadyState.equals(supervisorState); + } + public boolean isAtLeastOneSuccessfulRun() { return atLeastOneSuccessfulRun;