From 3100efc78c2edcedc123588ddc02f3a2c7ae9d7c Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Sun, 3 May 2020 21:38:41 -0700 Subject: [PATCH 1/7] refactor SeekableStreamSupervisor usage of RecordSupplier to reduce contention between background threads and main thread, refactor KinesisRecordSupplier, refactor Kinesis lag metric collection and emitting --- .../kafka/supervisor/KafkaSupervisor.java | 38 +++- .../indexing/kinesis/KinesisIndexTask.java | 5 +- .../kinesis/KinesisRecordSupplier.java | 146 +++++++++------- .../indexing/kinesis/KinesisSamplerSpec.java | 3 +- .../kinesis/supervisor/KinesisSupervisor.java | 40 ++++- .../kinesis/KinesisRecordSupplierTest.java | 34 ++-- .../supervisor/KinesisSupervisorTest.java | 102 +---------- .../supervisor/SeekableStreamSupervisor.java | 165 +++++++++++------- .../SeekableStreamSupervisorStateTest.java | 4 +- .../supervisor/SupervisorStateManager.java | 7 +- 10 files changed, 289 insertions(+), 255 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index ba12128ab1e1..89d20a255398 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -46,6 +46,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -336,16 +337,35 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> partitions - ) + protected void updatePartitionLagFromStream() { - latestSequenceFromStream = partitions.stream() - .collect(Collectors.toMap( - StreamPartition::getPartitionId, - recordSupplier::getPosition - )); + getRecordSupplierLock().lock(); + try { + Set partitionIds; + try { + partitionIds = recordSupplier.getPartitionIds(getIoConfig().getStream()); + } + catch (Exception e) { + log.warn("Could not fetch partitions for topic/stream [%s]", getIoConfig().getStream()); + throw new StreamException(e); + } + + Set> partitions = partitionIds + .stream() + .map(e -> new StreamPartition<>(getIoConfig().getStream(), e)) + .collect(Collectors.toSet()); + + recordSupplier.seekToLatest(partitions); + + latestSequenceFromStream = + partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition)); + } + catch (InterruptedException e) { + throw new StreamException(e); + } + finally { + getRecordSupplierLock().unlock(); + } } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 5d5a307e4fdc..2d7e9bd503c8 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -97,7 +97,7 @@ protected KinesisRecordSupplier newTaskRecordSupplier() KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig); int fetchThreads = tuningConfig.getFetchThreads() != null ? tuningConfig.getFetchThreads() - : Math.max(1, ioConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().size()); + : Runtime.getRuntime().availableProcessors() * 2; return new KinesisRecordSupplier( KinesisRecordSupplier.getAmazonKinesisClient( @@ -114,7 +114,8 @@ protected KinesisRecordSupplier newTaskRecordSupplier() tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), tuningConfig.getFetchSequenceNumberTimeout(), - tuningConfig.getMaxRecordsPerPoll() + tuningConfig.getMaxRecordsPerPoll(), + false ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index f4b78304d17c..ac1b249a132e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -65,6 +65,7 @@ import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; import java.nio.ByteBuffer; +import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -78,11 +79,15 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -112,11 +117,10 @@ private class PartitionResource // to indicate that this shard has no more records to read @Nullable private volatile String shardIterator; - private volatile boolean started; - private volatile boolean stopRequested; - private volatile long currentLagMillis; + private final AtomicBoolean fetchStarted = new AtomicBoolean(); + PartitionResource(StreamPartition streamPartition) { this.streamPartition = streamPartition; @@ -124,30 +128,34 @@ private class PartitionResource void startBackgroundFetch() { - if (started) { + if (shardIterator == null) { + log.info( + "Skipping background fetch for stream[%s] partition[%s] since seek has not been called on the stream", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); return; } + if (fetchStarted.compareAndSet(false, true)) { + log.info( + "Starting scheduled fetch runnable for stream[%s] partition[%s]", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); - log.info( - "Starting scheduled fetch runnable for stream[%s] partition[%s]", - streamPartition.getStream(), - streamPartition.getPartitionId() - ); - - stopRequested = false; - started = true; - - rescheduleRunnable(fetchDelayMillis); + rescheduleRunnable(fetchDelayMillis); + } } void stopBackgroundFetch() { - log.info( - "Stopping scheduled fetch runnable for stream[%s] partition[%s]", - streamPartition.getStream(), - streamPartition.getPartitionId() - ); - stopRequested = true; + if (fetchStarted.compareAndSet(true, false)) { + log.info( + "Stopping scheduled fetch runnable for stream[%s] partition[%s]", + streamPartition.getStream(), + streamPartition.getPartitionId() + ); + } } long getPartitionTimeLag() @@ -158,15 +166,19 @@ long getPartitionTimeLag() long getPartitionTimeLag(String offset) { // if not started (fetching records in background), fetch lag ourself with a throw-away iterator - if (!started) { + if (!fetchStarted.get()) { try { final String iteratorType; final String offsetToUse; if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) { - // this should probably check if will start processing earliest or latest rather than assuming earliest - // if latest we could skip this because latest will not be behind latest so lag is 0. - iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); - offsetToUse = null; + if (useEarliestSequenceNumber) { + iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + offsetToUse = null; + } else { + // if offset is not set and not using earliest, it means we will start reading from latest, + // so lag will be 0 and we have nothing to do here + return 0L; + } } else { iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); offsetToUse = offset; @@ -179,7 +191,7 @@ long getPartitionTimeLag(String offset) ).getShardIterator(); GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(recordsPerFetch) + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) ); currentLagMillis = recordsResult.getMillisBehindLatest(); @@ -198,14 +210,16 @@ long getPartitionTimeLag(String offset) return currentLagMillis; } + private Callable> getLagCallable(String partitionId, String offset) + { + return () -> new AbstractMap.SimpleEntry<>(partitionId, this.getPartitionTimeLag(offset)); + } + private Runnable getRecordRunnable() { return () -> { - if (stopRequested) { - started = false; - stopRequested = false; - + if (!fetchStarted.get()) { log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); return; } @@ -364,7 +378,7 @@ private Runnable getRecordRunnable() private void rescheduleRunnable(long delayMillis) { - if (started && !stopRequested) { + if (fetchStarted.get()) { try { scheduledExec.schedule(getRecordRunnable(), delayMillis, TimeUnit.MILLISECONDS); } @@ -398,6 +412,7 @@ private void rescheduleRunnable(long delayMillis) private final int maxRecordsPerPoll; private final int fetchThreads; private final int recordBufferSize; + private final boolean useEarliestSequenceNumber; private ScheduledExecutorService scheduledExec; @@ -405,8 +420,8 @@ private void rescheduleRunnable(long delayMillis) new ConcurrentHashMap<>(); private BlockingQueue> records; - private volatile boolean checkPartitionsStarted = false; private volatile boolean closed = false; + private AtomicBoolean partitionsFetchStarted = new AtomicBoolean(); public KinesisRecordSupplier( AmazonKinesis amazonKinesis, @@ -418,7 +433,8 @@ public KinesisRecordSupplier( int recordBufferOfferTimeout, int recordBufferFullWait, int fetchSequenceNumberTimeout, - int maxRecordsPerPoll + int maxRecordsPerPoll, + boolean useEarliestSequenceNumber ) { Preconditions.checkNotNull(amazonKinesis); @@ -432,6 +448,7 @@ public KinesisRecordSupplier( this.maxRecordsPerPoll = maxRecordsPerPoll; this.fetchThreads = fetchThreads; this.recordBufferSize = recordBufferSize; + this.useEarliestSequenceNumber = useEarliestSequenceNumber; // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache. // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the @@ -517,9 +534,8 @@ public static AmazonKinesis getAmazonKinesisClient( public void start() { checkIfClosed(); - if (checkPartitionsStarted) { + if (partitionsFetchStarted.compareAndSet(false, true)) { partitionResources.values().forEach(PartitionResource::startBackgroundFetch); - checkPartitionsStarted = false; } } @@ -535,15 +551,14 @@ public void assign(Set> collection) ) ); - for (Iterator, PartitionResource>> i = partitionResources.entrySet() - .iterator(); i.hasNext(); ) { + Iterator, PartitionResource>> i = partitionResources.entrySet().iterator(); + while (i.hasNext()) { Map.Entry, PartitionResource> entry = i.next(); if (!collection.contains(entry.getKey())) { i.remove(); entry.getValue().stopBackgroundFetch(); } } - } @Override @@ -581,9 +596,8 @@ public Collection> getAssignment() public List> poll(long timeout) { checkIfClosed(); - if (checkPartitionsStarted) { + if (partitionsFetchStarted.compareAndSet(false, true)) { partitionResources.values().forEach(PartitionResource::startBackgroundFetch); - checkPartitionsStarted = false; } try { @@ -701,12 +715,25 @@ Map getPartitionTimeLag() } public Map getPartitionTimeLag(Map currentOffsets) + throws InterruptedException, TimeoutException, ExecutionException { Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); - for (Map.Entry, PartitionResource> partition : partitionResources.entrySet()) { - final String partitionId = partition.getKey().getPartitionId(); - partitionLag.put(partitionId, partition.getValue().getPartitionTimeLag(currentOffsets.get(partitionId))); + + List>> longo = scheduledExec.invokeAll( + partitionResources.entrySet() + .stream() + .map(partition -> { + final String partitionId = partition.getKey().getPartitionId(); + return partition.getValue().getLagCallable(partitionId, currentOffsets.get(partitionId)); + }) + .collect(Collectors.toList()) + + ); + for (Future> future : longo) { + AbstractMap.SimpleEntry result = future.get(fetchSequenceNumberTimeout, TimeUnit.MILLISECONDS); + partitionLag.put(result.getKey(), result.getValue()); } + return partitionLag; } @@ -729,28 +756,28 @@ private void seekInternal(StreamPartition partition, String sequenceNumb iteratorEnum.toString(), sequenceNumber ).getShardIterator()); - - checkPartitionsStarted = true; } private void filterBufferAndResetFetchRunnable(Set> partitions) throws InterruptedException { - scheduledExec.shutdown(); + if (partitionsFetchStarted.compareAndSet(true, false)) { + scheduledExec.shutdown(); - try { - if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { - scheduledExec.shutdownNow(); + try { + if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { + scheduledExec.shutdownNow(); + } + } + catch (InterruptedException e) { + log.warn(e, "InterruptedException while shutting down"); + throw e; } - } - catch (InterruptedException e) { - log.warn(e, "InterruptedException while shutting down"); - throw e; - } - scheduledExec = Executors.newScheduledThreadPool( - fetchThreads, - Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") - ); + scheduledExec = Executors.newScheduledThreadPool( + fetchThreads, + Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") + ); + } // filter records in buffer and only retain ones whose partition was not seeked BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); @@ -762,8 +789,7 @@ private void filterBufferAndResetFetchRunnable(Set> part records = newQ; // restart fetching threads - partitionResources.values().forEach(x -> x.started = false); - checkPartitionsStarted = true; + partitionResources.values().forEach(x -> x.stopBackgroundFetch()); } @Nullable diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index 664b3b58b32d..401efe72c105 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -71,7 +71,8 @@ protected KinesisRecordSupplier createRecordSupplier() tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), tuningConfig.getFetchSequenceNumberTimeout(), - tuningConfig.getMaxRecordsPerPoll() + tuningConfig.getMaxRecordsPerPoll(), + ioConfig.isUseEarliestSequenceNumber() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index c789fc7f3ac7..bf979f90267e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -49,7 +49,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; @@ -64,6 +64,9 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; /** * Supervisor responsible for managing the KinesisIndexTask for a single dataSource. At a high level, the class accepts a @@ -210,7 +213,8 @@ protected RecordSupplier setupRecordSupplier() taskTuningConfig.getRecordBufferOfferTimeout(), taskTuningConfig.getRecordBufferFullWait(), taskTuningConfig.getFetchSequenceNumberTimeout(), - taskTuningConfig.getMaxRecordsPerPoll() + taskTuningConfig.getMaxRecordsPerPoll(), + ioConfig.isUseEarliestSequenceNumber() ); } @@ -298,7 +302,19 @@ protected Map getRecordLagPerPartition(Map current @Override protected Map getTimeLagPerPartition(Map currentOffsets) { - return ((KinesisRecordSupplier) recordSupplier).getPartitionTimeLag(currentOffsets); + return currentOffsets + .entrySet() + .stream() + .filter(e -> e.getValue() != null && + currentPartitionTimeLag != null && + currentPartitionTimeLag.get(e.getKey()) != null + ) + .collect( + Collectors.toMap( + Map.Entry::getKey, + e -> currentPartitionTimeLag.get(e.getKey()) + ) + ); } @Override @@ -317,13 +333,19 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> streamPartitions - ) + protected void updatePartitionLagFromStream() { - KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; - currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + getRecordSupplierLock().lock(); + try { + KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; + currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); + } + catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new StreamException(e); + } + finally { + getRecordSupplierLock().unlock(); + } } @Override diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index 4a631e84fa66..b91a37d4385c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -50,6 +50,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; public class KinesisRecordSupplierTest extends EasyMockSupport @@ -203,7 +205,8 @@ public void testSupplierSetup() 5000, 5000, 60000, - 5 + 5, + true ); Assert.assertTrue(recordSupplier.getAssignment().isEmpty()); @@ -212,6 +215,9 @@ public void testSupplierSetup() Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertEquals(ImmutableSet.of(SHARD_ID1, SHARD_ID0), recordSupplier.getPartitionIds(STREAM)); + + // calling poll would start background fetch if seek was called, but will instead be skipped and the results + // empty Assert.assertEquals(Collections.emptyList(), recordSupplier.poll(100)); verifyAll(); @@ -290,7 +296,8 @@ public void testPoll() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -367,7 +374,8 @@ public void testSeek() 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -434,7 +442,8 @@ public void testSeekToLatest() 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -468,7 +477,8 @@ public void testSeekUnassigned() throws InterruptedException 5000, 5000, 60000, - 5 + 5, + true ); recordSupplier.assign(partitions); @@ -530,7 +540,8 @@ public void testPollAfterSeek() 5000, 5000, 60000, - 1 + 1, + true ); recordSupplier.assign(partitions); @@ -622,7 +633,8 @@ public void testPollDeaggregate() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); @@ -692,13 +704,14 @@ private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNull 5000, 5000, 1000, - 100 + 100, + true ); return recordSupplier; } @Test - public void getPartitionTimeLag() throws InterruptedException + public void getPartitionTimeLag() throws InterruptedException, TimeoutException, ExecutionException { EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), @@ -749,7 +762,8 @@ public void getPartitionTimeLag() throws InterruptedException 5000, 5000, 60000, - 100 + 100, + true ); recordSupplier.assign(partitions); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 94ca6bf693b0..4f1261a61a03 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -107,7 +107,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; public class KinesisSupervisorTest extends EasyMockSupport { @@ -128,7 +130,6 @@ public class KinesisSupervisorTest extends EasyMockSupport private static final StreamPartition SHARD0_PARTITION = StreamPartition.of(STREAM, SHARD_ID0); private static final StreamPartition SHARD1_PARTITION = StreamPartition.of(STREAM, SHARD_ID1); private static final StreamPartition SHARD2_PARTITION = StreamPartition.of(STREAM, SHARD_ID2); - private static final Map TIME_LAG = ImmutableMap.of(SHARD_ID1, 9000L, SHARD_ID0, 1234L); private static DataSchema dataSchema; private KinesisRecordSupplier supervisorRecordSupplier; @@ -231,9 +232,6 @@ public void testNoInitialState() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -301,9 +299,6 @@ public void testMultiTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -363,9 +358,6 @@ public void testReplicas() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -443,9 +435,6 @@ public void testLateMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -498,9 +487,6 @@ public void testEarlyMessageRejectionPeriod() throws Exception EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -560,9 +546,6 @@ public void testDatasourceMetadata() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -660,9 +643,6 @@ public void testDontKillTasksWithMismatchedType() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); // non KinesisIndexTask (don't kill) Task id2 = new RealtimeIndexTask( @@ -729,9 +709,6 @@ public void testKillBadPartitionAssignment() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -844,9 +821,6 @@ public void testRequeueTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -958,9 +932,6 @@ public void testRequeueAdoptedTaskWhenFailed() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); DateTime now = DateTimes.nowUtc(); DateTime maxi = now.plusMinutes(60); @@ -1098,9 +1069,6 @@ public void testQueueNextTasksOnSuccess() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1227,9 +1195,6 @@ public void testBeginPublishAndQueueNextTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); final Capture firstTasks = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1452,9 +1417,6 @@ public void testDiscoverExistingPublishingTask() throws Exception Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(20000000L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); Assert.assertEquals("id1", publishingReport.getId()); @@ -1604,9 +1566,6 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() Assert.assertEquals(1, payload.getPublishingTasks().size()); Assert.assertEquals(SupervisorStateManager.BasicState.RUNNING, payload.getDetailedState()); Assert.assertEquals(0, payload.getRecentErrors().size()); - Assert.assertEquals(timeLag, payload.getMinimumLagMillis()); - Assert.assertEquals(9000L + 1234L, (long) payload.getAggregateLagMillis()); - TaskReportData publishingReport = payload.getPublishingTasks().get(0); @@ -1858,9 +1817,6 @@ public void testKillUnresponsiveTasksWhileGettingStartTime() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -1943,9 +1899,6 @@ public void testKillUnresponsiveTasksWhilePausing() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -2055,10 +2008,6 @@ public void testKillUnresponsiveTasksWhileSettingEndOffsets() throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); - Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -2206,9 +2155,6 @@ public void testStopGracefully() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2617,9 +2563,6 @@ public void testResetRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -2846,9 +2789,6 @@ public void testNoDataIngestionTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); @@ -2911,7 +2851,7 @@ public void testNoDataIngestionTasks() throws Exception @Test(timeout = 60_000L) - public void testCheckpointForInactiveTaskGroup() throws InterruptedException + public void testCheckpointForInactiveTaskGroup() throws InterruptedException, TimeoutException, ExecutionException { supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); //not adding any events @@ -3004,9 +2944,6 @@ public void testCheckpointForInactiveTaskGroup() throws InterruptedException EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); EasyMock.expect(taskRunner.getRunningTasks()).andReturn(workItems).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3260,9 +3197,6 @@ public void testSuspendedRunningTasks() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task id1 = createKinesisIndexTask( "id1", @@ -3481,7 +3415,8 @@ public void testGetCurrentTotalStats() } @Test - public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExistsException + public void testDoNotKillCompatibleTasks() + throws InterruptedException, EntryExistsException, TimeoutException, ExecutionException { // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks int numReplicas = 2; @@ -3511,9 +3446,6 @@ public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExi EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id2", @@ -3581,7 +3513,8 @@ public void testDoNotKillCompatibleTasks() throws InterruptedException, EntryExi } @Test - public void testKillIncompatibleTasks() throws InterruptedException, EntryExistsException + public void testKillIncompatibleTasks() + throws InterruptedException, EntryExistsException, TimeoutException, ExecutionException { // This supervisor always returns false for isTaskCurrent -> it should kill its tasks int numReplicas = 2; @@ -3610,9 +3543,6 @@ public void testKillIncompatibleTasks() throws InterruptedException, EntryExists EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Task task = createKinesisIndexTask( "id1", @@ -3855,9 +3785,6 @@ private List testShardSplitPhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -3966,9 +3893,6 @@ private List testShardSplitPhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4144,9 +4068,6 @@ private void testShardSplitPhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); @@ -4310,9 +4231,6 @@ private List testShardMergePhaseOne() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(EasyMock.anyObject())).andReturn("100").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture captured = Capture.newInstance(CaptureType.ALL); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); @@ -4432,9 +4350,6 @@ private List testShardMergePhaseTwo(List phaseOneTasks) throws Excep supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postMergeCaptured = Capture.newInstance(CaptureType.ALL); @@ -4589,9 +4504,6 @@ private void testShardMergePhaseThree(List phaseTwoTasks) throws Exception supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) - .andReturn(TIME_LAG) - .atLeastOnce(); Capture postSplitCaptured = Capture.newInstance(CaptureType.ALL); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 247ff0e00c10..35a60bdf58f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -37,6 +37,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import org.apache.commons.codec.digest.DigestUtils; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskState; @@ -108,6 +110,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -479,7 +482,7 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable TaskGroup taskGroup) private final BlockingQueue notices = new LinkedBlockingDeque<>(); private final Object stopLock = new Object(); private final Object stateChangeLock = new Object(); - private final Object recordSupplierLock = new Object(); + private final ReentrantLock recordSupplierLock = new ReentrantLock(); private final boolean useExclusiveStartingSequence; private boolean listenerRegistered = false; @@ -706,6 +709,11 @@ public void reset(DataSourceMetadata dataSourceMetadata) notices.add(new ResetNotice(dataSourceMetadata)); } + public ReentrantLock getRecordSupplierLock() + { + return recordSupplierLock; + } + @VisibleForTesting public void tryInit() @@ -1889,10 +1897,9 @@ private boolean updatePartitionDataFromStream() { List previousPartitionIds = new ArrayList<>(partitionIds); Set partitionIdsFromSupplier; + recordSupplierLock.lock(); try { - synchronized (recordSupplierLock) { - partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); - } + partitionIdsFromSupplier = recordSupplier.getPartitionIds(ioConfig.getStream()); } catch (Exception e) { stateManager.recordThrowableEvent(e); @@ -1900,6 +1907,9 @@ private boolean updatePartitionDataFromStream() log.debug(e, "full stack trace"); return false; } + finally { + recordSupplierLock.unlock(); + } if (partitionIdsFromSupplier == null || partitionIdsFromSupplier.size() == 0) { String errMsg = StringUtils.format("No partitions found for stream [%s]", ioConfig.getStream()); @@ -1989,6 +1999,7 @@ private boolean updatePartitionDataFromStream() ); } + Int2ObjectMap> newlyDiscovered = new Int2ObjectLinkedOpenHashMap<>(); for (PartitionIdType partitionId : activePartitionsIdsFromSupplier) { int taskGroupId = getTaskGroupIdForPartition(partitionId); Set partitionGroup = partitionGroups.computeIfAbsent( @@ -1998,16 +2009,36 @@ private boolean updatePartitionDataFromStream() partitionGroup.add(partitionId); if (partitionOffsets.putIfAbsent(partitionId, getNotSetMarker()) == null) { - log.info( + log.debug( "New partition [%s] discovered for stream [%s], added to task group [%d]", partitionId, ioConfig.getStream(), taskGroupId ); + + newlyDiscovered.compute(taskGroupId, (groupId, partitions) -> { + if (partitions == null) { + partitions = new ArrayList<>(); + } + partitions.add(partitionId); + return partitions; + }); + newlyDiscovered.get(taskGroupId).add(partitionId); } } + if (newlyDiscovered.size() > 0) { + for (Int2ObjectMap.Entry> entry : newlyDiscovered.int2ObjectEntrySet()) + log.info( + "New partitions %s discovered for stream [%s], added to task group [%s]", + entry.getValue(), + ioConfig.getStream(), + entry.getIntKey() + ); + } + if (!partitionIds.equals(previousPartitionIds)) { + assignRecordSupplierToPartitionIds(); // the set of partition IDs has changed, have any running tasks stop early so that we can adjust to the // repartitioning quickly by creating new tasks for (TaskGroup taskGroup : activelyReadingTaskGroups.values()) { @@ -2034,6 +2065,28 @@ private boolean updatePartitionDataFromStream() return true; } + private void assignRecordSupplierToPartitionIds() + { + recordSupplierLock.lock(); + try { + final Set partitions = partitionIds.stream() + .map(partitionId -> new StreamPartition<>(ioConfig.getStream(), partitionId)) + .collect(Collectors.toSet()); + if (!recordSupplier.getAssignment().containsAll(partitions)) { + recordSupplier.assign(partitions); + try { + recordSupplier.seekToEarliest(partitions); + } + catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + finally { + recordSupplierLock.unlock(); + } + } + /** * This method determines the set of expired partitions from the set of partitions currently returned by * the record supplier and the set of partitions previously tracked in the metadata. @@ -2106,6 +2159,7 @@ private void cleanupClosedAndExpiredPartitions( partitionIds.clear(); partitionIds.addAll(activePartitionsIdsFromSupplier); + assignRecordSupplierToPartitionIds(); for (Integer groupId : partitionGroups.keySet()) { if (newPartitionGroups.containsKey(groupId)) { @@ -2773,8 +2827,7 @@ protected Map> filter return startingOffsets; } - private void createNewTasks() - throws JsonProcessingException + private void createNewTasks() throws JsonProcessingException { // update the checkpoints in the taskGroup to latest ones so that new tasks do not read what is already published verifyAndMergeCheckpoints( @@ -2993,7 +3046,7 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti if (sequence == null) { throw new ISE("unable to fetch sequence number for partition[%s] from stream", partition); } - log.info("Getting sequence number [%s] for partition [%s]", sequence, partition); + log.debug("Getting sequence number [%s] for partition [%s]", sequence, partition); return makeSequenceNumber(sequence, false); } } @@ -3026,23 +3079,21 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { @Nullable private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partition, boolean useEarliestOffset) { - synchronized (recordSupplierLock) { + recordSupplierLock.lock(); + try { StreamPartition topicPartition = new StreamPartition<>(ioConfig.getStream(), partition); if (!recordSupplier.getAssignment().contains(topicPartition)) { - final Set partitions = Collections.singleton(topicPartition); - recordSupplier.assign(partitions); - try { - recordSupplier.seekToEarliest(partitions); - } - catch (InterruptedException e) { - throw new RuntimeException(e); - } + // this shouldn't happen, but in case it does... + throw new IllegalStateException("Record supplier does not match current known partitions"); } return useEarliestOffset ? recordSupplier.getEarliestSequenceNumber(topicPartition) : recordSupplier.getLatestSequenceNumber(topicPartition); } + finally { + recordSupplierLock.unlock(); + } } private void createTasksForGroup(int groupId, int replicas) @@ -3098,16 +3149,22 @@ private void createTasksForGroup(int groupId, int replicas) } } + /** + * monitoring method, fetches current partition offsets and lag in a background reporting thread + */ @VisibleForTesting public void updateCurrentAndLatestOffsets() { - try { - updateCurrentOffsets(); - updateLatestOffsetsFromStream(); - sequenceLastUpdated = DateTimes.nowUtc(); - } - catch (Exception e) { - log.warn(e, "Exception while getting current/latest sequences"); + // if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later + if (stateManager.isSteadyState()) { + try { + updateCurrentOffsets(); + updatePartitionLagFromStream(); + sequenceLastUpdated = DateTimes.nowUtc(); + } + catch (Exception e) { + log.warn(e, "Exception while getting current/latest sequences"); + } } } @@ -3136,34 +3193,7 @@ private void updateCurrentOffsets() throws InterruptedException, ExecutionExcept Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); } - private void updateLatestOffsetsFromStream() throws InterruptedException - { - synchronized (recordSupplierLock) { - Set partitionIds; - try { - partitionIds = recordSupplier.getPartitionIds(ioConfig.getStream()); - } - catch (Exception e) { - log.warn("Could not fetch partitions for topic/stream [%s]", ioConfig.getStream()); - throw new StreamException(e); - } - - Set> partitions = partitionIds - .stream() - .map(e -> new StreamPartition<>(ioConfig.getStream(), e)) - .collect(Collectors.toSet()); - - recordSupplier.assign(partitions); - recordSupplier.seekToLatest(partitions); - - updateLatestSequenceFromStream(recordSupplier, partitions); - } - } - - protected abstract void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, - Set> partitions - ); + protected abstract void updatePartitionLagFromStream(); /** * Gets 'lag' of currently processed offset behind latest offset as a measure of difference between offsets. @@ -3179,17 +3209,21 @@ protected abstract void updateLatestSequenceFromStream( protected Map getHighestCurrentOffsets() { - if (!spec.isSuspended() || activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { - return activelyReadingTaskGroups - .values() - .stream() - .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) - .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) - .collect(Collectors.toMap( - Entry::getKey, - Entry::getValue, - (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 - )); + if (!spec.isSuspended()) { + if (activelyReadingTaskGroups.size() > 0 || pendingCompletionTaskGroups.size() > 0) { + return activelyReadingTaskGroups + .values() + .stream() + .flatMap(taskGroup -> taskGroup.tasks.entrySet().stream()) + .flatMap(taskData -> taskData.getValue().currentSequences.entrySet().stream()) + .collect(Collectors.toMap( + Entry::getKey, + Entry::getValue, + (v1, v2) -> makeSequenceNumber(v1).compareTo(makeSequenceNumber(v2)) > 0 ? v1 : v2 + )); + } + // nothing is running but we are not suspended, so lets just hang out in case we get called while things start up + return ImmutableMap.of(); } else { // if supervisor is suspended, no tasks are likely running so use offsets in metadata, if exist return getOffsetsFromMetadataStorage(); @@ -3447,8 +3481,9 @@ private boolean checkOffsetAvailability( protected void emitLag() { - if (spec.isSuspended()) { - // don't emit metrics if supervisor is suspended (lag should still available in status report) + if (spec.isSuspended() || !stateManager.isSteadyState()) { + // don't emit metrics if supervisor is suspended or not in a healthy running state + // (lag should still available in status report) return; } try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d34bbbbec824..8ed87cb56039 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -986,9 +986,7 @@ protected String baseTaskName() } @Override - protected void updateLatestSequenceFromStream( - RecordSupplier recordSupplier, Set> streamPartitions - ) + protected void updatePartitionLagFromStream() { // do nothing } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index 76cf8c60927e..406d5d27b8f2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -92,7 +92,7 @@ public boolean isFirstRunOnly() private final Deque recentEventsQueue = new ConcurrentLinkedDeque<>(); - private State supervisorState = BasicState.PENDING; + private volatile State supervisorState = BasicState.PENDING; private boolean atLeastOneSuccessfulRun = false; private boolean currentRunSuccessful = true; @@ -214,6 +214,11 @@ public boolean isHealthy() return supervisorState != null && supervisorState.isHealthy(); } + public boolean isSteadyState() + { + return healthySteadyState.equals(supervisorState); + } + public boolean isAtLeastOneSuccessfulRun() { return atLeastOneSuccessfulRun; From b88703d420a962fab9a99ea33b15543c8bbb4b45 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 4 May 2020 01:16:14 -0700 Subject: [PATCH 2/7] fix style and test --- .../supervisor/KinesisSupervisorTest.java | 8 ++-- .../supervisor/SeekableStreamSupervisor.java | 15 +++--- .../SeekableStreamSupervisorStateTest.java | 46 +++++++++++-------- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 4f1261a61a03..5c9431b59e6f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -107,9 +107,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.TimeoutException; public class KinesisSupervisorTest extends EasyMockSupport { @@ -2851,7 +2849,7 @@ public void testNoDataIngestionTasks() throws Exception @Test(timeout = 60_000L) - public void testCheckpointForInactiveTaskGroup() throws InterruptedException, TimeoutException, ExecutionException + public void testCheckpointForInactiveTaskGroup() throws InterruptedException { supervisor = getTestableSupervisor(2, 1, true, "PT1S", null, null, false); //not adding any events @@ -3416,7 +3414,7 @@ public void testGetCurrentTotalStats() @Test public void testDoNotKillCompatibleTasks() - throws InterruptedException, EntryExistsException, TimeoutException, ExecutionException + throws InterruptedException, EntryExistsException { // This supervisor always returns true for isTaskCurrent -> it should not kill its tasks int numReplicas = 2; @@ -3514,7 +3512,7 @@ public void testDoNotKillCompatibleTasks() @Test public void testKillIncompatibleTasks() - throws InterruptedException, EntryExistsException, TimeoutException, ExecutionException + throws InterruptedException, EntryExistsException { // This supervisor always returns false for isTaskCurrent -> it should kill its tasks int numReplicas = 2; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 35a60bdf58f6..57c74a7d5464 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2028,13 +2028,14 @@ private boolean updatePartitionDataFromStream() } if (newlyDiscovered.size() > 0) { - for (Int2ObjectMap.Entry> entry : newlyDiscovered.int2ObjectEntrySet()) - log.info( - "New partitions %s discovered for stream [%s], added to task group [%s]", - entry.getValue(), - ioConfig.getStream(), - entry.getIntKey() - ); + for (Int2ObjectMap.Entry> taskGroupPartitions : newlyDiscovered.int2ObjectEntrySet()) { + log.info( + "New partitions %s discovered for stream [%s], added to task group [%s]", + taskGroupPartitions.getValue(), + ioConfig.getStream(), + taskGroupPartitions.getIntKey() + ); + } } if (!partitionIds.equals(previousPartitionIds)) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 8ed87cb56039..e451836f08cf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -753,26 +753,34 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept { spec = createMock(SeekableStreamSupervisorSpec.class); EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), - 1, - 1, - new Period("PT1H"), - new Period("PT1S"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, null - ) - { - }).anyTimes(); + EasyMock.expect(spec.getIoConfig()) + .andReturn( + new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), + 1, + 1, + new Period("PT1H"), + new Period("PT1S"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, null + ) + { + } + ).anyTimes(); EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); - EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig()).anyTimes(); + EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(new DruidMonitorSchedulerConfig() { + @Override + public Duration getEmitterPeriod() + { + return new Period("PT1S").toStandardDuration(); + } + }).anyTimes(); EasyMock.expect(spec.isSuspended()).andReturn(suspended).anyTimes(); EasyMock.expect(spec.getType()).andReturn("test").anyTimes(); @@ -1219,7 +1227,9 @@ protected Map getPartitionTimeLag() protected void emitLag() { super.emitLag(); - latch.countDown(); + if (stateManager.isSteadyState()) { + latch.countDown(); + } } @Override From b34a585f996057a9fe360cd9eae639d771ddf22f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Tue, 5 May 2020 04:52:10 -0700 Subject: [PATCH 3/7] cleanup, refactor, javadocs, test --- .../org/apache/druid/indexer/TaskIdUtils.java | 8 + .../kafka/supervisor/KafkaSupervisor.java | 3 +- .../indexing/kinesis/KinesisIndexTask.java | 5 + .../kinesis/KinesisRecordSupplier.java | 600 +++++++++--------- .../kinesis/supervisor/KinesisSupervisor.java | 33 +- .../kinesis/KinesisIndexTaskTest.java | 2 +- .../kinesis/KinesisRecordSupplierTest.java | 38 +- .../supervisor/KinesisSupervisorTest.java | 68 +- .../supervisor/SeekableStreamSupervisor.java | 3 + 9 files changed, 420 insertions(+), 340 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java index a88341b08339..80d48cec08e5 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java @@ -19,6 +19,7 @@ package org.apache.druid.indexer; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.druid.java.util.common.StringUtils; @@ -31,6 +32,8 @@ public class TaskIdUtils { private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); + private static final Joiner underscoreJoiner = Joiner.on("_"); + public static void validateId(String thingToValidate, String stringToValidate) { Preconditions.checkArgument( @@ -60,4 +63,9 @@ public static String getRandomId() } return suffix.toString(); } + + public static String getRandomIdWithPrefix(String prefix) + { + return underscoreJoiner.join(prefix, TaskIdUtils.getRandomId()); + } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 89d20a255398..2fc2edcdf429 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; import org.apache.druid.indexer.TaskIdUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.Task; @@ -222,7 +221,7 @@ protected List> createIndexTasks( List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); + String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KafkaIndexTask( taskId, new TaskResource(baseSequenceName, 1), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 2d7e9bd503c8..bfd375830497 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; @@ -99,6 +100,10 @@ protected KinesisRecordSupplier newTaskRecordSupplier() ? tuningConfig.getFetchThreads() : Runtime.getRuntime().availableProcessors() * 2; + Preconditions.checkArgument( + fetchThreads > 0, + "Must have at least one background fetch thread for the record supplier" + ); return new KinesisRecordSupplier( KinesisRecordSupplier.getAmazonKinesisClient( ioConfig.getEndpoint(), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index ac1b249a132e..133d4cb1158e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -65,7 +65,6 @@ import java.lang.invoke.MethodHandles; import java.lang.reflect.Method; import java.nio.ByteBuffer; -import java.util.AbstractMap; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -79,14 +78,11 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -99,6 +95,7 @@ public class KinesisRecordSupplier implements RecordSupplier private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; private static final long EXCEPTION_RETRY_DELAY_MS = 10000; + private static final int FETCH_SEQUENCE_NUMBER_RECORD_COUNT = 1000; private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { @@ -107,6 +104,37 @@ private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) return isIOException || isTimeout; } + /** + * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing + * array itself. Does not modify position or limit of the buffer. + */ + private static byte[] toByteArray(final ByteBuffer buffer) + { + if (buffer.hasArray() + && buffer.arrayOffset() == 0 + && buffer.position() == 0 + && buffer.array().length == buffer.limit()) { + return buffer.array(); + } else { + final byte[] retVal = new byte[buffer.remaining()]; + buffer.duplicate().get(retVal); + return retVal; + } + } + + /** + * Catch any exception and wrap it in a {@link StreamException} + */ + private static T wrapExceptions(Callable callable) + { + try { + return callable.call(); + } + catch (Exception e) { + throw new StreamException(e); + } + } + private class PartitionResource { private final StreamPartition streamPartition; @@ -121,16 +149,20 @@ private class PartitionResource private final AtomicBoolean fetchStarted = new AtomicBoolean(); - PartitionResource(StreamPartition streamPartition) + private PartitionResource(StreamPartition streamPartition) { this.streamPartition = streamPartition; } - void startBackgroundFetch() + private void startBackgroundFetch() { + if (!backgroundFetchEnabled) { + return; + } + // if seek has been called if (shardIterator == null) { - log.info( - "Skipping background fetch for stream[%s] partition[%s] since seek has not been called on the stream", + log.warn( + "Skipping background fetch for stream[%s] partition[%s] since seek has not been called for this partition", streamPartition.getStream(), streamPartition.getPartitionId() ); @@ -138,87 +170,49 @@ void startBackgroundFetch() } if (fetchStarted.compareAndSet(false, true)) { log.info( - "Starting scheduled fetch runnable for stream[%s] partition[%s]", + "Starting scheduled fetch for stream[%s] partition[%s]", streamPartition.getStream(), streamPartition.getPartitionId() ); - rescheduleRunnable(fetchDelayMillis); + scheduleBackgroundFetch(fetchDelayMillis); } } - void stopBackgroundFetch() + private void stopBackgroundFetch() { if (fetchStarted.compareAndSet(true, false)) { log.info( - "Stopping scheduled fetch runnable for stream[%s] partition[%s]", + "Stopping scheduled fetch for stream[%s] partition[%s]", streamPartition.getStream(), streamPartition.getPartitionId() ); } } - long getPartitionTimeLag() - { - return currentLagMillis; - } - - long getPartitionTimeLag(String offset) + private void scheduleBackgroundFetch(long delayMillis) { - // if not started (fetching records in background), fetch lag ourself with a throw-away iterator - if (!fetchStarted.get()) { + if (fetchStarted.get()) { try { - final String iteratorType; - final String offsetToUse; - if (offset == null || KinesisSupervisor.NOT_SET.equals(offset)) { - if (useEarliestSequenceNumber) { - iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); - offsetToUse = null; - } else { - // if offset is not set and not using earliest, it means we will start reading from latest, - // so lag will be 0 and we have nothing to do here - return 0L; - } - } else { - iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); - offsetToUse = offset; - } - String shardIterator = kinesis.getShardIterator( - streamPartition.getStream(), - streamPartition.getPartitionId(), - iteratorType, - offsetToUse - ).getShardIterator(); - - GetRecordsResult recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) - ); - - currentLagMillis = recordsResult.getMillisBehindLatest(); - return currentLagMillis; + scheduledExec.schedule(fetchRecords(), delayMillis, TimeUnit.MILLISECONDS); } - catch (Exception ex) { - // eat it + catch (RejectedExecutionException e) { log.warn( - ex, - "Failed to determine partition lag for partition %s of stream %s", - streamPartition.getPartitionId(), - streamPartition.getStream() + e, + "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. " + + "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", + streamPartition.getPartitionId() ); + } + } else { + log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); } - return currentLagMillis; } - private Callable> getLagCallable(String partitionId, String offset) - { - return () -> new AbstractMap.SimpleEntry<>(partitionId, this.getPartitionTimeLag(offset)); - } - - private Runnable getRecordRunnable() + private Runnable fetchRecords() { return () -> { - if (!fetchStarted.get()) { log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); return; @@ -245,7 +239,7 @@ private Runnable getRecordRunnable() if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) { log.warn("OrderedPartitionableRecord buffer full, retrying in [%,dms]", recordBufferFullWait); - rescheduleRunnable(recordBufferFullWait); + scheduleBackgroundFetch(recordBufferFullWait); } return; @@ -312,14 +306,14 @@ private Runnable getRecordRunnable() currRecord.getSequenceNumber() ).getShardIterator(); - rescheduleRunnable(recordBufferFullWait); + scheduleBackgroundFetch(recordBufferFullWait); return; } } shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed - rescheduleRunnable(fetchDelayMillis); + scheduleBackgroundFetch(fetchDelayMillis); } catch (ProvisionedThroughputExceededException e) { log.warn( @@ -329,7 +323,7 @@ private Runnable getRecordRunnable() + "the available throughput. Reduce the frequency or size of your requests." ); long retryMs = Math.max(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS, fetchDelayMillis); - rescheduleRunnable(retryMs); + scheduleBackgroundFetch(retryMs); } catch (InterruptedException e) { // may happen if interrupted while BlockingQueue.offer() is waiting @@ -338,7 +332,7 @@ private Runnable getRecordRunnable() "Interrupted while waiting to add record to buffer, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS ); - rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); + scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS); } catch (ExpiredIteratorException e) { log.warn( @@ -348,7 +342,7 @@ private Runnable getRecordRunnable() ); if (recordsResult != null) { shardIterator = recordsResult.getNextShardIterator(); // will be null if the shard has been closed - rescheduleRunnable(fetchDelayMillis); + scheduleBackgroundFetch(fetchDelayMillis); } else { throw new ISE("can't reschedule fetch records runnable, recordsResult is null??"); } @@ -361,7 +355,7 @@ private Runnable getRecordRunnable() catch (AmazonServiceException e) { if (isServiceExceptionRecoverable(e)) { log.warn(e, "encounted unknown recoverable AWS exception, retrying in [%,dms]", EXCEPTION_RETRY_DELAY_MS); - rescheduleRunnable(EXCEPTION_RETRY_DELAY_MS); + scheduleBackgroundFetch(EXCEPTION_RETRY_DELAY_MS); } else { log.warn(e, "encounted unknown unrecoverable AWS exception, will not retry"); throw new RuntimeException(e); @@ -369,31 +363,32 @@ private Runnable getRecordRunnable() } catch (Throwable e) { // non transient errors - log.error(e, "unknown getRecordRunnable exception, will not retry"); + log.error(e, "unknown fetchRecords exception, will not retry"); throw new RuntimeException(e); } }; } - private void rescheduleRunnable(long delayMillis) + private void seek(ShardIteratorType iteratorEnum, String sequenceNumber) { - if (fetchStarted.get()) { - try { - scheduledExec.schedule(getRecordRunnable(), delayMillis, TimeUnit.MILLISECONDS); - } - catch (RejectedExecutionException e) { - log.warn( - e, - "Caught RejectedExecutionException, KinesisRecordSupplier for partition[%s] has likely temporarily shutdown the ExecutorService. " - + "This is expected behavior after calling seek(), seekToEarliest() and seekToLatest()", - streamPartition.getPartitionId() - ); + log.debug( + "Seeking partition [%s] to [%s]", + streamPartition.getPartitionId(), + sequenceNumber != null ? sequenceNumber : iteratorEnum.toString() + ); - } - } else { - log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); - } + shardIterator = wrapExceptions(() -> kinesis.getShardIterator( + streamPartition.getStream(), + streamPartition.getPartitionId(), + iteratorEnum.toString(), + sequenceNumber + ).getShardIterator()); + } + + private long getPartitionTimeLag() + { + return currentLagMillis; } } @@ -420,6 +415,7 @@ private void rescheduleRunnable(long delayMillis) new ConcurrentHashMap<>(); private BlockingQueue> records; + private final boolean backgroundFetchEnabled; private volatile boolean closed = false; private AtomicBoolean partitionsFetchStarted = new AtomicBoolean(); @@ -449,6 +445,7 @@ public KinesisRecordSupplier( this.fetchThreads = fetchThreads; this.recordBufferSize = recordBufferSize; this.useEarliestSequenceNumber = useEarliestSequenceNumber; + this.backgroundFetchEnabled = fetchThreads > 0; // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache. // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the @@ -476,16 +473,18 @@ public KinesisRecordSupplier( getDataHandle = null; } - log.info( - "Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", - fetchThreads, - Runtime.getRuntime().availableProcessors() - ); + if (backgroundFetchEnabled) { + log.info( + "Creating fetch thread pool of size [%d] (Runtime.availableProcessors=%d)", + fetchThreads, + Runtime.getRuntime().availableProcessors() + ); - scheduledExec = Executors.newScheduledThreadPool( - fetchThreads, - Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") - ); + scheduledExec = Executors.newScheduledThreadPool( + fetchThreads, + Execs.makeThreadFactory("KinesisRecordSupplier-Worker-%d") + ); + } records = new LinkedBlockingQueue<>(recordBufferSize); } @@ -534,11 +533,35 @@ public static AmazonKinesis getAmazonKinesisClient( public void start() { checkIfClosed(); - if (partitionsFetchStarted.compareAndSet(false, true)) { + if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(false, true)) { partitionResources.values().forEach(PartitionResource::startBackgroundFetch); } } + @Override + public void close() + { + if (this.closed) { + return; + } + + assign(ImmutableSet.of()); + + scheduledExec.shutdown(); + + try { + if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { + scheduledExec.shutdownNow(); + } + } + catch (InterruptedException e) { + log.warn(e, "InterruptedException while shutting down"); + throw new RuntimeException(e); + } + + this.closed = true; + } + @Override public void assign(Set> collection) { @@ -561,44 +584,45 @@ public void assign(Set> collection) } } + @Override + public Collection> getAssignment() + { + return partitionResources.keySet(); + } + @Override public void seek(StreamPartition partition, String sequenceNumber) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(ImmutableSet.of(partition)); - seekInternal(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER); + filterBufferAndResetBackgroundFetch(ImmutableSet.of(partition)); + partitionSeek(partition, sequenceNumber, ShardIteratorType.AT_SEQUENCE_NUMBER); } @Override public void seekToEarliest(Set> partitions) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(partitions); - partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.TRIM_HORIZON)); + filterBufferAndResetBackgroundFetch(partitions); + partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.TRIM_HORIZON)); } @Override public void seekToLatest(Set> partitions) throws InterruptedException { - checkIfClosed(); - filterBufferAndResetFetchRunnable(partitions); - partitions.forEach(partition -> seekInternal(partition, null, ShardIteratorType.LATEST)); + filterBufferAndResetBackgroundFetch(partitions); + partitions.forEach(partition -> partitionSeek(partition, null, ShardIteratorType.LATEST)); } + @Nullable @Override - public Collection> getAssignment() + public String getPosition(StreamPartition partition) { - return partitionResources.keySet(); + throw new UnsupportedOperationException("getPosition() is not supported in Kinesis"); } @Nonnull @Override public List> poll(long timeout) { - checkIfClosed(); - if (partitionsFetchStarted.compareAndSet(false, true)) { - partitionResources.values().forEach(PartitionResource::startBackgroundFetch); - } + start(); try { int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll); @@ -630,23 +654,14 @@ public List> poll(long timeout) @Override public String getLatestSequenceNumber(StreamPartition partition) { - checkIfClosed(); - return getSequenceNumberInternal(partition, ShardIteratorType.LATEST); + return getSequenceNumber(partition, ShardIteratorType.LATEST); } @Nullable @Override public String getEarliestSequenceNumber(StreamPartition partition) { - checkIfClosed(); - return getSequenceNumberInternal(partition, ShardIteratorType.TRIM_HORIZON); - } - - @Nullable - @Override - public String getPosition(StreamPartition partition) - { - throw new UnsupportedOperationException("getPosition() is not supported in Kinesis"); + return getSequenceNumber(partition, ShardIteratorType.TRIM_HORIZON); } @Override @@ -679,87 +694,204 @@ public Set getPartitionIds(String stream) ); } - @Override - public void close() + /** + * Fetch the partition lag, given a stream and set of current partition offsets. This operates independently from + * the {@link PartitionResource} which have been assigned to this record supplier. + */ + public Map getPartitionsTimeLag(String stream, Map currentOffsets) { - if (this.closed) { - return; - } - - assign(ImmutableSet.of()); - - scheduledExec.shutdown(); - - try { - if (!scheduledExec.awaitTermination(EXCEPTION_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)) { - scheduledExec.shutdownNow(); - } - } - catch (InterruptedException e) { - log.warn(e, "InterruptedException while shutting down"); - throw new RuntimeException(e); + Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); + for (Map.Entry partitionOffset : currentOffsets.entrySet()) { + StreamPartition partition = new StreamPartition<>(stream, partitionOffset.getKey()); + long currentLag = getPartitionResourcesTimeLag(partition, partitionOffset.getValue()); + partitionLag.put(partitionOffset.getKey(), currentLag); } - - this.closed = true; + return partitionLag; } - // this is only used for tests + /** + * This method is only used for tests to verify that {@link PartitionResource} in fact tracks it's current lag + * as it is polled for records. This isn't currently used in production at all, but could be one day if we were + * to prefer to get the lag from the running tasks in the same API call which fetches the current task offsets, + * instead of directly calling the AWS Kinesis API with the offsets returned from those tasks + * (see {@link #getPartitionsTimeLag}, which accepts a map of current partition offsets). + */ @VisibleForTesting - Map getPartitionTimeLag() + Map getPartitionResourcesTimeLag() { return partitionResources.entrySet() .stream() .collect( - Collectors.toMap(k -> k.getKey().getPartitionId(), k -> k.getValue().getPartitionTimeLag()) + Collectors.toMap( + k -> k.getKey().getPartitionId(), + k -> k.getValue().getPartitionTimeLag() + ) ); } - public Map getPartitionTimeLag(Map currentOffsets) - throws InterruptedException, TimeoutException, ExecutionException + @VisibleForTesting + public int bufferSize() { - Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); - - List>> longo = scheduledExec.invokeAll( - partitionResources.entrySet() - .stream() - .map(partition -> { - final String partitionId = partition.getKey().getPartitionId(); - return partition.getValue().getLagCallable(partitionId, currentOffsets.get(partitionId)); - }) - .collect(Collectors.toList()) - - ); - for (Future> future : longo) { - AbstractMap.SimpleEntry result = future.get(fetchSequenceNumberTimeout, TimeUnit.MILLISECONDS); - partitionLag.put(result.getKey(), result.getValue()); - } + return records.size(); + } - return partitionLag; + @VisibleForTesting + public boolean isBackgroundFetchRunning() + { + return partitionsFetchStarted.get(); } - private void seekInternal(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) + /** + * Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call + * {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background + * fetch, which should have been stopped prior to calling this method by a call to + * {@link #filterBufferAndResetBackgroundFetch}. + */ + private void partitionSeek(StreamPartition partition, String sequenceNumber, ShardIteratorType iteratorEnum) { PartitionResource resource = partitionResources.get(partition); if (resource == null) { throw new ISE("Partition [%s] has not been assigned", partition); } + resource.seek(iteratorEnum, sequenceNumber); + } - log.debug( - "Seeking partition [%s] to [%s]", - partition.getPartitionId(), - sequenceNumber != null ? sequenceNumber : iteratorEnum.toString() - ); + /** + * Given a partition and a {@link ShardIteratorType}, create a shard iterator and fetch + * {@link #FETCH_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set. + * This method is thread safe as it does not depend on the internal state of the supplier (it doesn't use the + * {@link PartitionResource} which have been assigned to the supplier), and the Kinesis client is thread safe. + */ + @Nullable + private String getSequenceNumber(StreamPartition partition, ShardIteratorType iteratorEnum) + { + return wrapExceptions(() -> { + String shardIterator = + kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString()) + .getShardIterator(); + long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout; + GetRecordsResult recordsResult = null; + + while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) { + + if (closed) { + log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); + return null; + } + try { + // we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. + // In the case where the shard is constantly removing records that are past their retention period, it is possible + // that we never find the first record in the shard if we use a limit of 1. + recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(FETCH_SEQUENCE_NUMBER_RECORD_COUNT) + ); + } + catch (ProvisionedThroughputExceededException e) { + log.warn( + e, + "encountered ProvisionedThroughputExceededException while fetching records, this means " + + "that the request rate for the stream is too high, or the requested data is too large for " + + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " + + "the number of shards to increase throughput." + ); + try { + Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS); + continue; + } + catch (InterruptedException e1) { + log.warn(e1, "Thread interrupted!"); + Thread.currentThread().interrupt(); + break; + } + } - resource.shardIterator = wrapExceptions(() -> kinesis.getShardIterator( - partition.getStream(), - partition.getPartitionId(), - iteratorEnum.toString(), - sequenceNumber - ).getShardIterator()); + List records = recordsResult.getRecords(); + + if (!records.isEmpty()) { + return records.get(0).getSequenceNumber(); + } + + shardIterator = recordsResult.getNextShardIterator(); + } + + if (shardIterator == null) { + log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId()); + return KinesisSequenceNumber.END_OF_SHARD_MARKER; + } + + + // if we reach here, it usually means either the shard has no more records, or records have not been + // added to this shard + log.warn( + "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", + partition.getPartitionId(), + recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" + ); + return null; + }); } - private void filterBufferAndResetFetchRunnable(Set> partitions) throws InterruptedException + /** + * Given a {@link StreamPartition} and an offset, create a 'shard iterator' for the offset and fetch a single record + * in order to get the lag: {@link GetRecordsResult#getMillisBehindLatest()}. This method is thread safe as it does + * not depend on the internal state of the supplier (it doesn't use the {@link PartitionResource} which have been + * assigned to the supplier), and the Kinesis client is thread safe. + */ + private Long getPartitionResourcesTimeLag(StreamPartition partition, String offset) + { + return wrapExceptions(() -> { + final String iteratorType; + final String offsetToUse; + if (offset == null || KinesisSupervisor.OFFSET_NOT_SET.equals(offset)) { + if (useEarliestSequenceNumber) { + iteratorType = ShardIteratorType.TRIM_HORIZON.toString(); + offsetToUse = null; + } else { + // if offset is not set and not using earliest, it means we will start reading from latest, + // so lag will be 0 and we have nothing to do here + return 0L; + } + } else { + iteratorType = ShardIteratorType.AT_SEQUENCE_NUMBER.toString(); + offsetToUse = offset; + } + String shardIterator = kinesis.getShardIterator( + partition.getStream(), + partition.getPartitionId(), + iteratorType, + offsetToUse + ).getShardIterator(); + + GetRecordsResult recordsResult = kinesis.getRecords( + new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1) + ); + + return recordsResult.getMillisBehindLatest(); + }); + } + + /** + * Explode if {@link #close()} has been called on the supplier. + */ + private void checkIfClosed() { + if (closed) { + throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed"); + } + } + + /** + * This method must be called before a seek operation ({@link #seek}, {@link #seekToLatest}, or + * {@link #seekToEarliest}). + * + * When called, it will nuke the {@link #scheduledExec} that is shared by all {@link PartitionResource}, filters + * records from the buffer for partitions which will have a seek operation performed, and stops background fetch for + * each {@link PartitionResource} to prepare for the seek. If background fetch is not currently running, the + * {@link #scheduledExec} will not be re-created. + */ + private void filterBufferAndResetBackgroundFetch(Set> partitions) throws InterruptedException + { + checkIfClosed(); if (partitionsFetchStarted.compareAndSet(true, false)) { scheduledExec.shutdown(); @@ -791,118 +923,4 @@ private void filterBufferAndResetFetchRunnable(Set> part // restart fetching threads partitionResources.values().forEach(x -> x.stopBackgroundFetch()); } - - @Nullable - private String getSequenceNumberInternal(StreamPartition partition, ShardIteratorType iteratorEnum) - { - return wrapExceptions(() -> getSequenceNumberInternal( - partition, - kinesis.getShardIterator(partition.getStream(), partition.getPartitionId(), iteratorEnum.toString()) - .getShardIterator() - )); - } - - @Nullable - private String getSequenceNumberInternal(StreamPartition partition, String shardIterator) - { - long timeoutMillis = System.currentTimeMillis() + fetchSequenceNumberTimeout; - GetRecordsResult recordsResult = null; - - while (shardIterator != null && System.currentTimeMillis() < timeoutMillis) { - - if (closed) { - log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); - return null; - } - try { - // we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. - // In the case where the shard is constantly removing records that are past their retention period, it is possible - // that we never find the first record in the shard if we use a limit of 1. - recordsResult = kinesis.getRecords(new GetRecordsRequest().withShardIterator(shardIterator).withLimit(1000)); - } - catch (ProvisionedThroughputExceededException e) { - log.warn( - e, - "encountered ProvisionedThroughputExceededException while fetching records, this means " - + "that the request rate for the stream is too high, or the requested data is too large for " - + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " - + "the number of shards to increase throughput." - ); - try { - Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS); - continue; - } - catch (InterruptedException e1) { - log.warn(e1, "Thread interrupted!"); - Thread.currentThread().interrupt(); - break; - } - } - - List records = recordsResult.getRecords(); - - if (!records.isEmpty()) { - return records.get(0).getSequenceNumber(); - } - - shardIterator = recordsResult.getNextShardIterator(); - } - - if (shardIterator == null) { - log.info("Partition[%s] returned a null shard iterator, is the shard closed?", partition.getPartitionId()); - return KinesisSequenceNumber.END_OF_SHARD_MARKER; - } - - - // if we reach here, it usually means either the shard has no more records, or records have not been - // added to this shard - log.warn( - "timed out while trying to fetch position for shard[%s], millisBehindLatest is [%s], likely no more records in shard", - partition.getPartitionId(), - recordsResult != null ? recordsResult.getMillisBehindLatest() : "UNKNOWN" - ); - return null; - - } - - private void checkIfClosed() - { - if (closed) { - throw new ISE("Invalid operation - KinesisRecordSupplier has already been closed"); - } - } - - /** - * Returns an array with the content between the position and limit of "buffer". This may be the buffer's backing - * array itself. Does not modify position or limit of the buffer. - */ - private static byte[] toByteArray(final ByteBuffer buffer) - { - if (buffer.hasArray() - && buffer.arrayOffset() == 0 - && buffer.position() == 0 - && buffer.array().length == buffer.limit()) { - return buffer.array(); - } else { - final byte[] retVal = new byte[buffer.remaining()]; - buffer.duplicate().get(retVal); - return retVal; - } - } - - private static T wrapExceptions(Callable callable) - { - try { - return callable.call(); - } - catch (Exception e) { - throw new StreamException(e); - } - } - - @VisibleForTesting - public int bufferSize() - { - return records.size(); - } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index bf979f90267e..532734e0a131 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Joiner; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.indexer.TaskIdUtils; @@ -49,7 +48,6 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; -import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; @@ -64,8 +62,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; /** @@ -85,7 +81,7 @@ public class KinesisSupervisor extends SeekableStreamSupervisor { }; - public static final String NOT_SET = "-1"; + public static final String OFFSET_NOT_SET = "-1"; private final KinesisSupervisorSpec spec; private final AWSCredentialsConfig awsCredentialsConfig; private volatile Map currentPartitionTimeLag; @@ -172,7 +168,7 @@ protected List> createIndexTasks( List> taskList = new ArrayList<>(); for (int i = 0; i < replicas; i++) { - String taskId = Joiner.on("_").join(baseSequenceName, TaskIdUtils.getRandomId()); + String taskId = TaskIdUtils.getRandomIdWithPrefix(baseSequenceName); taskList.add(new KinesisIndexTask( taskId, new TaskResource(baseSequenceName, 1), @@ -192,8 +188,7 @@ protected List> createIndexTasks( @Override - protected RecordSupplier setupRecordSupplier() - throws RuntimeException + protected RecordSupplier setupRecordSupplier() throws RuntimeException { KinesisSupervisorIOConfig ioConfig = spec.getIoConfig(); KinesisIndexTaskTuningConfig taskTuningConfig = spec.getTuningConfig(); @@ -207,7 +202,7 @@ protected RecordSupplier setupRecordSupplier() ), ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), - 1, + 0, // skip starting background fetch, it is not used ioConfig.isDeaggregate(), taskTuningConfig.getRecordBufferSize(), taskTuningConfig.getRecordBufferOfferTimeout(), @@ -335,17 +330,9 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i @Override protected void updatePartitionLagFromStream() { - getRecordSupplierLock().lock(); - try { - KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; - currentPartitionTimeLag = supplier.getPartitionTimeLag(getHighestCurrentOffsets()); - } - catch (InterruptedException | TimeoutException | ExecutionException e) { - throw new StreamException(e); - } - finally { - getRecordSupplierLock().unlock(); - } + KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier; + // this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock + currentPartitionTimeLag = supplier.getPartitionsTimeLag(getIoConfig().getStream(), getHighestCurrentOffsets()); } @Override @@ -369,7 +356,7 @@ protected String baseTaskName() @Override protected String getNotSetMarker() { - return NOT_SET; + return OFFSET_NOT_SET; } @Override @@ -478,7 +465,7 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat } } - newSequences = new SeekableStreamStartSequenceNumbers( + newSequences = new SeekableStreamStartSequenceNumbers<>( old.getStream(), null, newPartitionSequenceNumberMap, @@ -486,7 +473,7 @@ private SeekableStreamDataSourceMetadata createDataSourceMetadat newExclusiveStartPartitions ); } else { - newSequences = new SeekableStreamEndSequenceNumbers( + newSequences = new SeekableStreamEndSequenceNumbers<>( old.getStream(), null, newPartitionSequenceNumberMap, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index c886065955d1..4daa2fdc5d9c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2034,7 +2034,7 @@ public void testRestoreAfterPersistingSequences() throws Exception .andReturn(Collections.emptyList()) .anyTimes(); - EasyMock.expect(recordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(recordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(null) .anyTimes(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index b91a37d4385c..6b2f32f6cdda 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -50,8 +50,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; public class KinesisRecordSupplierTest extends EasyMockSupport @@ -315,7 +313,7 @@ public void testPoll() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @Test @@ -394,7 +392,7 @@ public void testSeek() Assert.assertEquals(9, polledRecords.size()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(4, 12))); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS.subList(1, 2))); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @@ -560,7 +558,7 @@ public void testPollAfterSeek() ); // only one partition in this test. first results come from getRecordsResult1, which has SHARD1_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD1_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag()); recordSupplier.seek(StreamPartition.of(STREAM, SHARD_ID1), "7"); recordSupplier.start(); @@ -574,7 +572,7 @@ public void testPollAfterSeek() Assert.assertEquals(ALL_RECORDS.get(9), record2); // only one partition in this test. second results come from getRecordsResult0, which has SHARD0_LAG_MILLIS - Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(ImmutableMap.of(SHARD_ID1, SHARD0_LAG_MILLIS), recordSupplier.getPartitionResourcesTimeLag()); verifyAll(); } @@ -652,7 +650,7 @@ public void testPollDeaggregate() throws InterruptedException Assert.assertEquals(partitions, recordSupplier.getAssignment()); Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionTimeLag()); + Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); } @Test @@ -711,23 +709,21 @@ private KinesisRecordSupplier getSequenceNumberWhenShardIsEmptyShouldReturnsNull } @Test - public void getPartitionTimeLag() throws InterruptedException, TimeoutException, ExecutionException + public void getPartitionTimeLag() throws InterruptedException { EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), EasyMock.anyString(), EasyMock.anyString() - )).andReturn( - getShardIteratorResult0).anyTimes(); + )).andReturn(getShardIteratorResult0).anyTimes(); EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID1), EasyMock.anyString(), EasyMock.anyString() - )).andReturn( - getShardIteratorResult1).anyTimes(); + )).andReturn(getShardIteratorResult1).anyTimes(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); @@ -741,8 +737,8 @@ public void getPartitionTimeLag() throws InterruptedException, TimeoutException, EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); + EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).times(2); + EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).times(2); replayAll(); @@ -751,7 +747,6 @@ public void getPartitionTimeLag() throws InterruptedException, TimeoutException, StreamPartition.of(STREAM, SHARD_ID1) ); - recordSupplier = new KinesisRecordSupplier( kinesis, recordsPerFetch, @@ -774,16 +769,19 @@ public void getPartitionTimeLag() throws InterruptedException, TimeoutException, Thread.sleep(100); } + Map timeLag = recordSupplier.getPartitionResourcesTimeLag(); + + + Assert.assertEquals(partitions, recordSupplier.getAssignment()); + Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); + Map offsts = ImmutableMap.of( SHARD_ID1, SHARD1_RECORDS.get(0).getSequenceNumber(), SHARD_ID0, SHARD0_RECORDS.get(0).getSequenceNumber() ); - Map timeLag = recordSupplier.getPartitionTimeLag(offsts); - + Map independentTimeLag = recordSupplier.getPartitionsTimeLag(STREAM, offsts); + Assert.assertEquals(SHARDS_LAG_MILLIS, independentTimeLag); verifyAll(); - - Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(SHARDS_LAG_MILLIS, timeLag); } /** diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 5c9431b59e6f..af59e211ee14 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -279,6 +279,68 @@ public void testNoInitialState() throws Exception ); } + @Test + public void testRecordSupplier() + { + KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + false + ); + KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); + KinesisSupervisor supervisor = new KinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + clientFactory, + OBJECT_MAPPER, + new KinesisSupervisorSpec( + null, + dataSchema, + tuningConfig, + kinesisSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + clientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null, + new SupervisorStateManagerConfig() + ), + rowIngestionMetersFactory, + null + ); + + KinesisRecordSupplier supplier = (KinesisRecordSupplier) supervisor.setupRecordSupplier(); + Assert.assertNotNull(supplier); + Assert.assertEquals(0, supplier.bufferSize()); + Assert.assertEquals(Collections.emptySet(), supplier.getAssignment()); + // background fetch should not be enabled for supervisor supplier + supplier.start(); + Assert.assertFalse(supplier.isBackgroundFetchRunning()); + } + @Test public void testMultiTask() throws Exception { @@ -1323,7 +1385,7 @@ public void testDiscoverExistingPublishingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); @@ -1484,7 +1546,7 @@ public void testDiscoverExistingPublishingTaskWithDifferentPartitionAllocation() EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); @@ -1637,7 +1699,7 @@ public void testDiscoverExistingPublishingAndReadingTask() throws Exception EasyMock.expect(supervisorRecordSupplier.getLatestSequenceNumber(SHARD0_PARTITION)).andReturn("1").anyTimes(); supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); EasyMock.expectLastCall().anyTimes(); - EasyMock.expect(supervisorRecordSupplier.getPartitionTimeLag(EasyMock.anyObject())) + EasyMock.expect(supervisorRecordSupplier.getPartitionsTimeLag(EasyMock.anyString(), EasyMock.anyObject())) .andReturn(timeLag) .atLeastOnce(); Task id1 = createKinesisIndexTask( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 57c74a7d5464..a9ea2bac990c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3077,6 +3077,9 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Fetches the earliest or latest offset from the stream via the {@link RecordSupplier} + */ @Nullable private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partition, boolean useEarliestOffset) { From 59e5156a21a641937ad9eb8c5b812d6de101062f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 7 May 2020 00:25:00 -0700 Subject: [PATCH 4/7] fixes --- core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java | 4 ++-- .../apache/druid/indexing/kinesis/KinesisRecordSupplier.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java index 80d48cec08e5..76317d9ca5a2 100644 --- a/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java +++ b/core/src/main/java/org/apache/druid/indexer/TaskIdUtils.java @@ -32,7 +32,7 @@ public class TaskIdUtils { private static final Pattern INVALIDCHARS = Pattern.compile("(?s).*[^\\S ].*"); - private static final Joiner underscoreJoiner = Joiner.on("_"); + private static final Joiner UNDERSCORE_JOINER = Joiner.on("_"); public static void validateId(String thingToValidate, String stringToValidate) { @@ -66,6 +66,6 @@ public static String getRandomId() public static String getRandomIdWithPrefix(String prefix) { - return underscoreJoiner.join(prefix, TaskIdUtils.getRandomId()); + return UNDERSCORE_JOINER.join(prefix, TaskIdUtils.getRandomId()); } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 133d4cb1158e..617ee010f987 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -703,7 +703,7 @@ public Map getPartitionsTimeLag(String stream, Map Map partitionLag = Maps.newHashMapWithExpectedSize(currentOffsets.size()); for (Map.Entry partitionOffset : currentOffsets.entrySet()) { StreamPartition partition = new StreamPartition<>(stream, partitionOffset.getKey()); - long currentLag = getPartitionResourcesTimeLag(partition, partitionOffset.getValue()); + long currentLag = getPartitionTimeLag(partition, partitionOffset.getValue()); partitionLag.put(partitionOffset.getKey(), currentLag); } return partitionLag; @@ -837,7 +837,7 @@ private String getSequenceNumber(StreamPartition partition, ShardIterato * not depend on the internal state of the supplier (it doesn't use the {@link PartitionResource} which have been * assigned to the supplier), and the Kinesis client is thread safe. */ - private Long getPartitionResourcesTimeLag(StreamPartition partition, String offset) + private Long getPartitionTimeLag(StreamPartition partition, String offset) { return wrapExceptions(() -> { final String iteratorType; From f1c5e732dca61246bb3bb819515331b083923a5f Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 7 May 2020 00:38:21 -0700 Subject: [PATCH 5/7] keep collecting current offsets and lag if unhealthy in background reporting thread --- .../seekablestream/supervisor/SeekableStreamSupervisor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index a9ea2bac990c..db3470733d31 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3159,8 +3159,10 @@ private void createTasksForGroup(int groupId, int replicas) @VisibleForTesting public void updateCurrentAndLatestOffsets() { - // if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later - if (stateManager.isSteadyState()) { + // if we aren't in a steady state, chill out for a bit, don't worry, we'll get called later, but if we aren't + // healthy go ahead and try anyway to try if possible to provide insight into how much time is left to fix the + // issue for cluster operators since this feeds the lag metrics + if (stateManager.isSteadyState() || !stateManager.isHealthy()) { try { updateCurrentOffsets(); updatePartitionLagFromStream(); From b909e9e4cd0fd93b39d691a047757841bef41112 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 15 May 2020 02:17:15 -0700 Subject: [PATCH 6/7] review stuffs --- .../kinesis/KinesisRecordSupplier.java | 75 ++++++++++--------- .../kinesis/KinesisSequenceNumber.java | 28 ++++--- .../supervisor/SeekableStreamSupervisor.java | 9 +-- 3 files changed, 59 insertions(+), 53 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java index 617ee010f987..b32ee149f544 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java @@ -54,6 +54,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; @@ -82,6 +83,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; @@ -95,7 +97,14 @@ public class KinesisRecordSupplier implements RecordSupplier private static final EmittingLogger log = new EmittingLogger(KinesisRecordSupplier.class); private static final long PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS = 3000; private static final long EXCEPTION_RETRY_DELAY_MS = 10000; - private static final int FETCH_SEQUENCE_NUMBER_RECORD_COUNT = 1000; + + /** + * We call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. + * In the case where the shard is constantly removing records that are past their retention period, it is possible + * that we never find the first record in the shard if we use a limit of 1. + */ + private static final int GET_SEQUENCE_NUMBER_RECORD_COUNT = 1000; + private static final int GET_SEQUENCE_NUMBER_RETRY_COUNT = 10; private static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { @@ -148,6 +157,7 @@ private class PartitionResource private volatile long currentLagMillis; private final AtomicBoolean fetchStarted = new AtomicBoolean(); + private ScheduledFuture currentFetch; private PartitionResource(StreamPartition streamPartition) { @@ -169,7 +179,7 @@ private void startBackgroundFetch() return; } if (fetchStarted.compareAndSet(false, true)) { - log.info( + log.debug( "Starting scheduled fetch for stream[%s] partition[%s]", streamPartition.getStream(), streamPartition.getPartitionId() @@ -182,11 +192,14 @@ private void startBackgroundFetch() private void stopBackgroundFetch() { if (fetchStarted.compareAndSet(true, false)) { - log.info( + log.debug( "Stopping scheduled fetch for stream[%s] partition[%s]", streamPartition.getStream(), streamPartition.getPartitionId() ); + if (currentFetch != null && !currentFetch.isDone()) { + currentFetch.cancel(true); + } } } @@ -194,7 +207,7 @@ private void scheduleBackgroundFetch(long delayMillis) { if (fetchStarted.get()) { try { - scheduledExec.schedule(fetchRecords(), delayMillis, TimeUnit.MILLISECONDS); + currentFetch = scheduledExec.schedule(fetchRecords(), delayMillis, TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { log.warn( @@ -206,7 +219,7 @@ private void scheduleBackgroundFetch(long delayMillis) } } else { - log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); + log.debug("Worker for partition[%s] is already stopped", streamPartition.getPartitionId()); } } @@ -214,7 +227,7 @@ private Runnable fetchRecords() { return () -> { if (!fetchStarted.get()) { - log.info("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); + log.debug("Worker for partition[%s] has been stopped", streamPartition.getPartitionId()); return; } @@ -758,7 +771,7 @@ private void partitionSeek(StreamPartition partition, String sequenceNum /** * Given a partition and a {@link ShardIteratorType}, create a shard iterator and fetch - * {@link #FETCH_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set. + * {@link #GET_SEQUENCE_NUMBER_RECORD_COUNT} records and return the first sequence number from the result set. * This method is thread safe as it does not depend on the internal state of the supplier (it doesn't use the * {@link PartitionResource} which have been assigned to the supplier), and the Kinesis client is thread safe. */ @@ -778,32 +791,26 @@ private String getSequenceNumber(StreamPartition partition, ShardIterato log.info("KinesisRecordSupplier closed while fetching sequenceNumber"); return null; } - try { - // we call getRecords with limit 1000 to make sure that we can find the first (earliest) record in the shard. - // In the case where the shard is constantly removing records that are past their retention period, it is possible - // that we never find the first record in the shard if we use a limit of 1. - recordsResult = kinesis.getRecords( - new GetRecordsRequest().withShardIterator(shardIterator).withLimit(FETCH_SEQUENCE_NUMBER_RECORD_COUNT) - ); - } - catch (ProvisionedThroughputExceededException e) { - log.warn( - e, - "encountered ProvisionedThroughputExceededException while fetching records, this means " - + "that the request rate for the stream is too high, or the requested data is too large for " - + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " - + "the number of shards to increase throughput." - ); - try { - Thread.sleep(PROVISIONED_THROUGHPUT_EXCEEDED_BACKOFF_MS); - continue; - } - catch (InterruptedException e1) { - log.warn(e1, "Thread interrupted!"); - Thread.currentThread().interrupt(); - break; - } - } + final String currentShardIterator = shardIterator; + final GetRecordsRequest request = new GetRecordsRequest().withShardIterator(currentShardIterator) + .withLimit(GET_SEQUENCE_NUMBER_RECORD_COUNT); + recordsResult = RetryUtils.retry( + () -> kinesis.getRecords(request), + (throwable) -> { + if (throwable instanceof ProvisionedThroughputExceededException) { + log.warn( + throwable, + "encountered ProvisionedThroughputExceededException while fetching records, this means " + + "that the request rate for the stream is too high, or the requested data is too large for " + + "the available throughput. Reduce the frequency or size of your requests. Consider increasing " + + "the number of shards to increase throughput." + ); + return true; + } + return false; + }, + GET_SEQUENCE_NUMBER_RETRY_COUNT + ); List records = recordsResult.getRecords(); @@ -892,7 +899,7 @@ private void checkIfClosed() private void filterBufferAndResetBackgroundFetch(Set> partitions) throws InterruptedException { checkIfClosed(); - if (partitionsFetchStarted.compareAndSet(true, false)) { + if (backgroundFetchEnabled && partitionsFetchStarted.compareAndSet(true, false)) { scheduledExec.shutdown(); try { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java index 4bd8fe8d4872..ab13f5bda05c 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSequenceNumber.java @@ -36,27 +36,33 @@ public class KinesisSequenceNumber extends OrderedSequenceNumber */ public static final String END_OF_SHARD_MARKER = "EOS"; - // this special marker is used by the KinesisSupervisor to set the endOffsets - // of newly created indexing tasks. This is necessary because streaming tasks do not - // have endPartitionOffsets. This marker signals to the task that it should continue - // to ingest data until taskDuration has elapsed or the task was stopped or paused or killed + + /** + * This special marker is used by the KinesisSupervisor to set the endOffsets of newly created indexing tasks. This + * is necessary because streaming tasks do not have endPartitionOffsets. This marker signals to the task that it + * should continue to ingest data until taskDuration has elapsed or the task was stopped or paused or killed. + */ public static final String NO_END_SEQUENCE_NUMBER = "NO_END_SEQUENCE_NUMBER"; - // this special marker is used by the KinesisSupervisor to mark that a shard has been expired - // (i.e., closed and then the retention period has passed) + + /** + * This special marker is used by the KinesisSupervisor to mark that a shard has been expired + * (i.e., closed and then the retention period has passed) + */ public static final String EXPIRED_MARKER = "EXPIRED"; - // this flag is used to indicate either END_OF_SHARD_MARKER - // or NO_END_SEQUENCE_NUMBER so that they can be properly compared - // with other sequence numbers + /** + * this flag is used to indicate either END_OF_SHARD_MARKER + * or NO_END_SEQUENCE_NUMBER so that they can be properly compared + * with other sequence numbers + */ private final boolean isMaxSequenceNumber; private final BigInteger intSequence; private KinesisSequenceNumber(String sequenceNumber, boolean isExclusive) { super(sequenceNumber, isExclusive); - if (END_OF_SHARD_MARKER.equals(sequenceNumber) - || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) { + if (END_OF_SHARD_MARKER.equals(sequenceNumber) || NO_END_SEQUENCE_NUMBER.equals(sequenceNumber)) { isMaxSequenceNumber = true; this.intSequence = null; } else { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index db3470733d31..cc4b986bd1a7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2016,14 +2016,7 @@ private boolean updatePartitionDataFromStream() taskGroupId ); - newlyDiscovered.compute(taskGroupId, (groupId, partitions) -> { - if (partitions == null) { - partitions = new ArrayList<>(); - } - partitions.add(partitionId); - return partitions; - }); - newlyDiscovered.get(taskGroupId).add(partitionId); + newlyDiscovered.computeIfAbsent(taskGroupId, ArrayList::new).add(partitionId); } } From 56fcb2cb58535ed08a750b2c562cda452ce1eeb7 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Fri, 15 May 2020 02:21:48 -0700 Subject: [PATCH 7/7] add comment --- .../druid/indexing/kafka/supervisor/KafkaSupervisor.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 0945924cfb99..b2352fe40c49 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -354,6 +354,9 @@ protected void updatePartitionLagFromStream() recordSupplier.seekToLatest(partitions); + // this method isn't actually computing the lag, just fetching the latests offsets from the stream. This is + // because we currently only have record lag for kafka, which can be lazily computed by subtracting the highest + // task offsets from the latest offsets from the stream when it is needed latestSequenceFromStream = partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, recordSupplier::getPosition)); }