diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 5071b1533665..7d0709d99cf9 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -241,11 +241,9 @@ The following table outlines the configuration options for `ioConfig`: |`completionTimeout`|ISO 8601 period|The length of time to wait before Druid declares a publishing task has failed and terminates it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT6H| |`lateMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject messages with timestamps earlier than this period before the task is created. For example, if `lateMessageRejectionPeriod` is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, messages with timestamps earlier than `2016-01-01T11:00Z` are dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a streaming and a nightly batch ingestion pipeline.|No|| |`earlyMessageRejectionPeriod`|ISO 8601 period|Configure tasks to reject messages with timestamps later than this period after the task reached its `taskDuration`. For example, if `earlyMessageRejectionPeriod` is set to `PT1H`, the `taskDuration` is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`. Messages with timestamps later than `2016-01-01T14:00Z` are dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|No|| -|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.| |`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See [Determine fetch settings](#determine-fetch-settings).|No|0| |`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|No|| |`awsExternalId`|String|The AWS external ID to use for additional permissions.|No|| -|`deaggregate`|Boolean|Whether to use the deaggregate function of the Kinesis Client Library (KCL).|No|| |`autoScalerConfig`|Object|Defines autoscaling behavior for Kinesis ingest tasks. See [Task autoscaler properties](#task-autoscaler-properties) for more information.|No|null| ### Task autoscaler properties @@ -406,7 +404,7 @@ The following table outlines the configuration options for `tuningConfig`: |`chatRetries`|Integer|The number of times Druid retries HTTP requests to indexing tasks before considering tasks unresponsive.|No|8| |`httpTimeout`|ISO 8601 period|The period of time to wait for a HTTP response from an indexing task.|No|PT10S| |`shutdownTimeout`|ISO 8601 period|The period of time to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|No|PT80S| -|`recordBufferSize`|Integer|The size of the buffer (number of events) Druid uses between the Kinesis fetch threads and the main ingestion thread.|No|See [Determine fetch settings](#determine-fetch-settings) for defaults.| +|`recordBufferSizeBytes`|Integer| The size of the buffer (heap memory bytes) Druid uses between the Kinesis fetch threads and the main ingestion thread.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.| |`recordBufferOfferTimeout`|Integer|The number of milliseconds to wait for space to become available in the buffer before timing out.|No|5000| |`recordBufferFullWait`|Integer|The number of milliseconds to wait for the buffer to drain before Druid attempts to fetch records from Kinesis again.|No|5000| |`fetchThreads`|Integer|The size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|No| `procs * 2`, where `procs` is the number of processors available to the task.| @@ -414,7 +412,7 @@ The following table outlines the configuration options for `tuningConfig`: |`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.|No|`false`| |`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|No|unlimited| |`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps track of the most recent parse exceptions. `maxSavedParseExceptions` limits the number of saved exception instances. These saved exceptions are available after the task finishes in the [task completion report](../../ingestion/tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|No|0| -|`maxRecordsPerPoll`|Integer|The maximum number of records to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`.|No| See [Determine fetch settings](#determine-fetch-settings) for defaults.| +|`maxBytesPerPoll`|Integer| The maximum number of bytes to be fetched from buffer per poll. At least one record is polled from the buffer regardless of this config.|No| 1000000 bytes| |`repartitionTransitionDuration`|ISO 8601 period|When shards are split or merged, the supervisor recomputes shard to task group mappings. The supervisor also signals any running tasks created under the old mappings to stop early at current time + `repartitionTransitionDuration`. Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split or merge, which helps avoid issues with [empty shard handling](https://github.com/apache/druid/issues/7600).|No|PT2M| |`offsetFetchPeriod`|ISO 8601 period|Determines how often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value of PT5S, the supervisor ignores the value and uses the minimum value instead.|No|PT30S| |`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. You must set the necessary `IAM` permissions.|No|`false`| @@ -656,25 +654,22 @@ For more detail, see [Segment size optimization](../../operations/segment-optimi Kinesis indexing tasks fetch records using `fetchThreads` threads. If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused. -Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches +Each fetch thread fetches up to 10 MB of records at once from a Kinesis shard, with a delay between fetches of `fetchDelayMillis`. -The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`. +The records fetched by each thread are pushed into a shared queue of size `recordBufferSizeBytes`. The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once. -When using Kinesis Producer Library's aggregation feature, that is when [`deaggregate`](#deaggregation) is set, -each of these parameters refers to aggregated records rather than individual records. - The default values for these parameters are: - `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that -particular server). +particular server). This value is further limited so that the total data record data fetched at a given time does not +exceed 5% of the max heap configured, assuming that each thread fetches 10 MB of records at once. If the value specified +for this configuration is higher than this limit, no failure occurs, but a warning is logged, and the value is +implicitly lowered to the max allowed by this constraint. - `fetchDelayMillis`: 0 (no delay between fetches). -- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`. -For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). -- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller. -For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). -- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation). +- `recordBufferSizeBytes`: 100 MB or an estimated 10% of available heap, whichever is smaller. +- `maxBytesPerPoll`: 1000000. Kinesis places the following restrictions on calls to fetch records: @@ -697,8 +692,6 @@ Kinesis stream. The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis Producer Library's aggregate method for more efficient data transfer. -To enable this feature, set `deaggregate` to true in your `ioConfig` when submitting a supervisor spec. - ## Resharding [Resharding](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html) is an advanced operation that lets you adjust the number of shards in a stream to adapt to changes in the rate of data flowing through a stream. diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java index 87183d62fe75..661543f707cb 100644 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java +++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java @@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import org.apache.druid.emitter.kafka.KafkaEmitterConfig.EventType; -import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer; +import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -176,7 +176,7 @@ private void sendSegmentMetadataToKafka() private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue recordQueue, Callback callback) { - ObjectContainer objectToSend; + MemoryBoundLinkedBlockingQueue.ObjectContainer objectToSend; try { while (true) { objectToSend = recordQueue.take(); @@ -206,7 +206,7 @@ public void emit(final Event event) String resultJson = jsonMapper.writeValueAsString(map); - ObjectContainer objectContainer = new ObjectContainer<>( + MemoryBoundLinkedBlockingQueue.ObjectContainer objectContainer = new MemoryBoundLinkedBlockingQueue.ObjectContainer<>( resultJson, StringUtils.toUtf8(resultJson).length ); diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java deleted file mode 100644 index fb6cae8ee954..000000000000 --- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/MemoryBoundLinkedBlockingQueue.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.emitter.kafka; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicLong; - -/** - * Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue - * rather than number of items. - */ -public class MemoryBoundLinkedBlockingQueue -{ - private final long memoryBound; - private final AtomicLong currentMemory; - private final LinkedBlockingQueue> queue; - - public MemoryBoundLinkedBlockingQueue(long memoryBound) - { - this.memoryBound = memoryBound; - this.currentMemory = new AtomicLong(0L); - this.queue = new LinkedBlockingQueue<>(); - } - - // returns true/false depending on whether item was added or not - public boolean offer(ObjectContainer item) - { - final long itemLength = item.getSize(); - - if (currentMemory.addAndGet(itemLength) <= memoryBound) { - if (queue.offer(item)) { - return true; - } - } - currentMemory.addAndGet(-itemLength); - return false; - } - - // blocks until at least one item is available to take - public ObjectContainer take() throws InterruptedException - { - final ObjectContainer ret = queue.take(); - currentMemory.addAndGet(-ret.getSize()); - return ret; - } - - public static class ObjectContainer - { - private T data; - private long size; - - ObjectContainer(T data, long size) - { - this.data = data; - this.size = size; - } - - public T getData() - { - return data; - } - - public long getSize() - { - return size; - } - } -} diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 1aa6d5b2e3be..fb019f10030b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -49,6 +49,10 @@ public class KinesisIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kinesis"; + + // GetRecords returns maximum 10MB per call + // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) + private static final long GET_RECORDS_MAX_BYTES_PER_CALL = 10_000_000L; private static final Logger log = new Logger(KinesisIndexTask.class); private final boolean useListShards; @@ -84,6 +88,14 @@ public KinesisIndexTask( public TaskStatus runTask(TaskToolbox toolbox) { this.runtimeInfo = toolbox.getAdjustedRuntimeInfo(); + if (getTuningConfig().getRecordBufferSizeConfigured() != null) { + log.warn("The 'recordBufferSize' config property of the kinesis tuning config has been deprecated. " + + "Please use 'recordBufferSizeBytes'."); + } + if (getTuningConfig().getMaxRecordsPerPollConfigured() != null) { + log.warn("The 'maxRecordsPerPoll' config property of the kinesis tuning config has been deprecated. " + + "Please use 'maxBytesPerPoll'."); + } return super.runTask(toolbox); } @@ -105,21 +117,18 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) { KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig); KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig); + final int recordBufferSizeBytes = + tuningConfig.getRecordBufferSizeBytesOrDefault(runtimeInfo.getMaxHeapSizeBytes()); final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads()); - final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads); - final int recordBufferSize = - tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate()); - final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()); + final int maxBytesPerPoll = tuningConfig.getMaxBytesPerPollOrDefault(); log.info( - "Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], " - + "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].", + "Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], " + + "recordBufferSizeBytes [%d], maxBytesPerPoll [%d]", fetchThreads, ioConfig.getFetchDelayMillis(), - recordsPerFetch, - recordBufferSize, - maxRecordsPerPoll, - ioConfig.isDeaggregate() + recordBufferSizeBytes, + maxBytesPerPoll ); return new KinesisRecordSupplier( @@ -129,19 +138,24 @@ protected KinesisRecordSupplier newTaskRecordSupplier(final TaskToolbox toolbox) ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId() ), - recordsPerFetch, ioConfig.getFetchDelayMillis(), fetchThreads, - ioConfig.isDeaggregate(), - recordBufferSize, + recordBufferSizeBytes, tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), - maxRecordsPerPoll, + maxBytesPerPoll, false, useListShards ); } + @Override + @JsonProperty + public KinesisIndexTaskTuningConfig getTuningConfig() + { + return (KinesisIndexTaskTuningConfig) super.getTuningConfig(); + } + @Override @JsonProperty("ioConfig") public KinesisIndexTaskIOConfig getIOConfig() @@ -179,15 +193,38 @@ AWSCredentialsConfig getAwsCredentialsConfig() } @VisibleForTesting - static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads) + static int computeFetchThreads( + final RuntimeInfo runtimeInfo, + final Integer configuredFetchThreads + ) { - final int fetchThreads; + int fetchThreads; if (configuredFetchThreads != null) { fetchThreads = configuredFetchThreads; } else { fetchThreads = runtimeInfo.getAvailableProcessors() * 2; } + // Each fetchThread can return upto 10MB at a time + // (https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html), cap fetchThreads so that + // we don't exceed more than the least of 100MB or 5% of heap at a time. Don't fail if fetchThreads specified + // is greater than this as to not cause failure for older configurations, but log warning in this case, and lower + // fetchThreads implicitly. + final long memoryToUse = Math.min( + KinesisIndexTaskIOConfig.MAX_RECORD_FETCH_MEMORY, + (long) (runtimeInfo.getMaxHeapSizeBytes() * KinesisIndexTaskIOConfig.RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION) + ); + int maxFetchThreads = Math.max( + 1, + (int) (memoryToUse / GET_RECORDS_MAX_BYTES_PER_CALL) + ); + if (fetchThreads > maxFetchThreads) { + if (configuredFetchThreads != null) { + log.warn("fetchThreads [%d] being lowered to [%d]", configuredFetchThreads, maxFetchThreads); + } + fetchThreads = maxFetchThreads; + } + Preconditions.checkArgument( fetchThreads > 0, "Must have at least one background fetch thread for the record supplier" diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index 0572c3170062..881d68ba8968 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.primitives.Ints; import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; @@ -41,21 +40,19 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig data; + if (deaggregateHandle == null || getDataHandle == null) { + throw new ISE("deaggregateHandle or getDataHandle is null!"); + } - if (deaggregate) { - if (deaggregateHandle == null || getDataHandle == null) { - throw new ISE("deaggregateHandle or getDataHandle is null!"); - } - - data = new ArrayList<>(); + data = new ArrayList<>(); - final List userRecords = (List) deaggregateHandle.invokeExact( - Collections.singletonList(kinesisRecord) - ); + final List userRecords = (List) deaggregateHandle.invokeExact( + Collections.singletonList(kinesisRecord) + ); - for (Object userRecord : userRecords) { - data.add(new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord))); - } - } else { - data = Collections.singletonList(new ByteEntity(kinesisRecord.getData())); + int recordSize = 0; + for (Object userRecord : userRecords) { + ByteEntity byteEntity = new ByteEntity((ByteBuffer) getDataHandle.invoke(userRecord)); + recordSize += byteEntity.getBuffer().array().length; + data.add(byteEntity); } + currRecord = new OrderedPartitionableRecord<>( streamPartition.getStream(), streamPartition.getPartitionId(), @@ -277,10 +278,11 @@ private Runnable fetchRecords() if (log.isTraceEnabled()) { log.trace( - "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferRemainingCapacity[%d]: %s", + "Stream[%s] / partition[%s] / sequenceNum[%s] / bufferByteSize[%d] / bufferRemainingByteCapacity[%d]: %s", currRecord.getStream(), currRecord.getPartitionId(), currRecord.getSequenceNumber(), + records.byteSize(), records.remainingCapacity(), currRecord.getData() .stream() @@ -292,24 +294,18 @@ private Runnable fetchRecords() ); } - // If the buffer was full and we weren't able to add the message, grab a new stream iterator starting - // from this message and back off for a bit to let the buffer drain before retrying. - if (!records.offer(currRecord, recordBufferOfferTimeout, TimeUnit.MILLISECONDS)) { + recordBufferOfferWaitMillis = recordBufferOfferTimeout; + while (!records.offer( + new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(currRecord, recordSize), + recordBufferOfferWaitMillis, + TimeUnit.MILLISECONDS + )) { log.warn( "Kinesis records are being processed slower than they are fetched. " + "OrderedPartitionableRecord buffer full, storing iterator and retrying in [%,dms].", recordBufferFullWait ); - - shardIterator = kinesis.getShardIterator( - currRecord.getStream(), - currRecord.getPartitionId(), - ShardIteratorType.AT_SEQUENCE_NUMBER.toString(), - currRecord.getSequenceNumber() - ).getShardIterator(); - - scheduleBackgroundFetch(recordBufferFullWait); - return; + recordBufferOfferWaitMillis = recordBufferFullWait; } } @@ -399,15 +395,12 @@ private long getPartitionTimeLag() private final MethodHandle getDataHandle; private final AmazonKinesis kinesis; - - private final int recordsPerFetch; private final int fetchDelayMillis; - private final boolean deaggregate; private final int recordBufferOfferTimeout; private final int recordBufferFullWait; - private final int maxRecordsPerPoll; + private final int maxBytesPerPoll; private final int fetchThreads; - private final int recordBufferSize; + private final int recordBufferSizeBytes; private final boolean useEarliestSequenceNumber; private final boolean useListShards; @@ -415,7 +408,7 @@ private long getPartitionTimeLag() private final ConcurrentMap, PartitionResource> partitionResources = new ConcurrentHashMap<>(); - private BlockingQueue> records; + private MemoryBoundLinkedBlockingQueue> records; private final boolean backgroundFetchEnabled; private volatile boolean closed = false; @@ -423,56 +416,48 @@ private long getPartitionTimeLag() public KinesisRecordSupplier( AmazonKinesis amazonKinesis, - int recordsPerFetch, int fetchDelayMillis, int fetchThreads, - boolean deaggregate, - int recordBufferSize, + int recordBufferSizeBytes, int recordBufferOfferTimeout, int recordBufferFullWait, - int maxRecordsPerPoll, + int maxBytesPerPoll, boolean useEarliestSequenceNumber, boolean useListShards ) { Preconditions.checkNotNull(amazonKinesis); this.kinesis = amazonKinesis; - this.recordsPerFetch = recordsPerFetch; this.fetchDelayMillis = fetchDelayMillis; - this.deaggregate = deaggregate; this.recordBufferOfferTimeout = recordBufferOfferTimeout; this.recordBufferFullWait = recordBufferFullWait; - this.maxRecordsPerPoll = maxRecordsPerPoll; + this.maxBytesPerPoll = maxBytesPerPoll; this.fetchThreads = fetchThreads; - this.recordBufferSize = recordBufferSize; + this.recordBufferSizeBytes = recordBufferSizeBytes; this.useEarliestSequenceNumber = useEarliestSequenceNumber; this.useListShards = useListShards; this.backgroundFetchEnabled = fetchThreads > 0; - // the deaggregate function is implemented by the amazon-kinesis-client, whose license is not compatible with Apache. - // The work around here is to use reflection to find the deaggregate function in the classpath. See details on the - // docs page for more information on how to use deaggregation - if (deaggregate) { - try { - Class kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord"); - MethodHandles.Lookup lookup = MethodHandles.publicLookup(); + // The deaggregate function is implemented by the amazon-kinesis-client, whose license was formerly not compatible + // with Apache. The code here avoids the license issue by using reflection, but is no longer necessary since + // amazon-kinesis-client is now Apache-licensed and is now a dependency of Druid. This code could safely be + // modified to use regular calls rather than reflection. + try { + Class kclUserRecordclass = Class.forName("com.amazonaws.services.kinesis.clientlibrary.types.UserRecord"); + MethodHandles.Lookup lookup = MethodHandles.publicLookup(); - Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate", List.class); - Method getDataMethod = kclUserRecordclass.getMethod("getData"); + Method deaggregateMethod = kclUserRecordclass.getMethod("deaggregate", List.class); + Method getDataMethod = kclUserRecordclass.getMethod("getData"); - deaggregateHandle = lookup.unreflect(deaggregateMethod); - getDataHandle = lookup.unreflect(getDataMethod); - } - catch (ClassNotFoundException e) { - throw new ISE(e, "cannot find class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], " - + "note that when using deaggregate=true, you must provide the Kinesis Client Library jar in the classpath"); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } else { - deaggregateHandle = null; - getDataHandle = null; + deaggregateHandle = lookup.unreflect(deaggregateMethod); + getDataHandle = lookup.unreflect(getDataMethod); + } + catch (ClassNotFoundException e) { + throw new ISE(e, "cannot find class[com.amazonaws.services.kinesis.clientlibrary.types.UserRecord], " + + "note that when using deaggregate=true, you must provide the Kinesis Client Library jar in the classpath"); + } + catch (Exception e) { + throw new RuntimeException(e); } if (backgroundFetchEnabled) { @@ -488,7 +473,7 @@ public KinesisRecordSupplier( ); } - records = new LinkedBlockingQueue<>(recordBufferSize); + records = new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes); } public static AmazonKinesis getAmazonKinesisClient( @@ -635,23 +620,19 @@ public List> poll(long ti start(); try { - int expectedSize = Math.min(Math.max(records.size(), 1), maxRecordsPerPoll); - - List> polledRecords = new ArrayList<>(expectedSize); + List>> polledRecords = new ArrayList<>(); - Queues.drain( - records, + records.drain( polledRecords, - expectedSize, + maxBytesPerPoll, timeout, TimeUnit.MILLISECONDS ); - polledRecords = polledRecords.stream() - .filter(x -> partitionResources.containsKey(x.getStreamPartition())) - .collect(Collectors.toList()); - - return polledRecords; + return polledRecords.stream() + .filter(x -> partitionResources.containsKey(x.getData().getStreamPartition())) + .map(MemoryBoundLinkedBlockingQueue.ObjectContainer::getData) + .collect(Collectors.toList()); } catch (InterruptedException e) { log.warn(e, "Interrupted while polling"); @@ -1059,11 +1040,22 @@ private void filterBufferAndResetBackgroundFetch(Set> pa } // filter records in buffer and only retain ones whose partition was not seeked - BlockingQueue> newQ = new LinkedBlockingQueue<>(recordBufferSize); + MemoryBoundLinkedBlockingQueue> newQ = + new MemoryBoundLinkedBlockingQueue<>(recordBufferSizeBytes); records.stream() - .filter(x -> !partitions.contains(x.getStreamPartition())) - .forEachOrdered(newQ::offer); + .filter(x -> !partitions.contains(x.getData().getStreamPartition())) + .forEachOrdered(x -> { + if (!newQ.offer(x)) { + // this should never really happen in practice but adding check here for safety. + throw DruidException.defensive("Failed to insert item to new queue when resetting background fetch. " + + "[stream: '%s', partitionId: '%s', sequenceNumber: '%s']", + x.getData().getStream(), + x.getData().getPartitionId(), + x.getData().getSequenceNumber() + ); + } + }); records = newQ; diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java index 29e909eb0bb7..81f8b774f042 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java @@ -73,14 +73,12 @@ protected KinesisRecordSupplier createRecordSupplier() ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId() ), - ioConfig.getRecordsPerFetch() != null ? ioConfig.getRecordsPerFetch() : DEFAULT_RECORDS_PER_FETCH, ioConfig.getFetchDelayMillis(), 1, - ioConfig.isDeaggregate(), - tuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), ioConfig.isDeaggregate()), + tuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()), tuningConfig.getRecordBufferOfferTimeout(), tuningConfig.getRecordBufferFullWait(), - tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()), + tuningConfig.getMaxBytesPerPollOrDefault(), ioConfig.isUseEarliestSequenceNumber(), tuningConfig.isUseListShards() ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index accd8316f66f..a142f4147627 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -143,11 +143,9 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( maximumMessageTime, ioConfig.getInputFormat(), ioConfig.getEndpoint(), - ioConfig.getRecordsPerFetch(), ioConfig.getFetchDelayMillis(), ioConfig.getAwsAssumedRoleArn(), - ioConfig.getAwsExternalId(), - ioConfig.isDeaggregate() + ioConfig.getAwsExternalId() ); } @@ -197,14 +195,12 @@ protected RecordSupplier setupRecordSupplier() throw ioConfig.getAwsAssumedRoleArn(), ioConfig.getAwsExternalId() ), - 0, // no records-per-fetch, it is not used ioConfig.getFetchDelayMillis(), 0, // skip starting background fetch, it is not used - ioConfig.isDeaggregate(), - taskTuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), ioConfig.isDeaggregate()), + taskTuningConfig.getRecordBufferSizeBytesOrDefault(Runtime.getRuntime().maxMemory()), taskTuningConfig.getRecordBufferOfferTimeout(), taskTuningConfig.getRecordBufferFullWait(), - taskTuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()), + taskTuningConfig.getMaxBytesPerPollOrDefault(), ioConfig.isUseEarliestSequenceNumber(), spec.getSpec().getTuningConfig().isUseListShards() ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index a568aea263ca..9910f22a349b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -71,12 +71,12 @@ public KinesisSupervisorIOConfig( @JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod, @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, - @JsonProperty("recordsPerFetch") Integer recordsPerFetch, + @JsonProperty("recordsPerFetch") @Deprecated Integer recordsPerFetch, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, - @JsonProperty("deaggregate") boolean deaggregate + @JsonProperty("deaggregate") @Deprecated boolean deaggregate ) { super( diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 357bc9e57ca4..a0a68c14bc0b 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -74,6 +74,8 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, + null, null ); } @@ -98,14 +100,16 @@ public KinesisSupervisorTuningConfig( @JsonProperty("chatRetries") Long chatRetries, @JsonProperty("httpTimeout") Period httpTimeout, @JsonProperty("shutdownTimeout") Period shutdownTimeout, - @JsonProperty("recordBufferSize") Integer recordBufferSize, + @JsonProperty("recordBufferSize") @Deprecated @Nullable Integer recordBufferSize, + @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes, @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout, @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait, @JsonProperty("fetchThreads") Integer fetchThreads, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, - @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, + @JsonProperty("maxRecordsPerPoll") @Deprecated @Nullable Integer maxRecordsPerPoll, + @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, @@ -129,6 +133,7 @@ public KinesisSupervisorTuningConfig( resetOffsetAutomatically, skipSequenceNumberAvailabilityCheck, recordBufferSize, + recordBufferSizeBytes, recordBufferOfferTimeout, recordBufferFullWait, fetchThreads, @@ -137,6 +142,7 @@ public KinesisSupervisorTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, + maxBytesPerPoll, intermediateHandoffPeriod ); @@ -225,7 +231,7 @@ public String toString() ", chatRetries=" + chatRetries + ", httpTimeout=" + httpTimeout + ", shutdownTimeout=" + shutdownTimeout + - ", recordBufferSize=" + getRecordBufferSizeConfigured() + + ", recordBufferSizeBytes=" + getRecordBufferSizeBytesConfigured() + ", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() + ", recordBufferFullWait=" + getRecordBufferFullWait() + ", fetchThreads=" + getFetchThreads() + @@ -234,6 +240,7 @@ public String toString() ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() + + ", maxBytesPerPoll=" + getMaxBytesPerPollConfigured() + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", useListShards=" + isUseListShards() + @@ -260,6 +267,7 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() isResetOffsetAutomatically(), isSkipSequenceNumberAvailabilityCheck(), getRecordBufferSizeConfigured(), + getRecordBufferSizeBytesConfigured(), getRecordBufferOfferTimeout(), getRecordBufferFullWait(), getFetchThreads(), @@ -268,6 +276,7 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), + getMaxBytesPerPollConfigured(), getIntermediateHandoffPeriod() ); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java index 0d6af4343aae..3162b2ea0eee 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIOConfigTest.java @@ -93,13 +93,10 @@ public void testSerdeWithDefaults() throws Exception Assert.assertTrue(config.isUseTransaction()); Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent()); Assert.assertEquals(config.getEndpoint(), "kinesis.us-east-1.amazonaws.com"); - Assert.assertNull(config.getRecordsPerFetchConfigured()); - Assert.assertEquals(config.getRecordsPerFetchOrDefault(1_000_000_000, 4), 1250); Assert.assertEquals(config.getFetchDelayMillis(), 0); Assert.assertEquals(Collections.emptySet(), config.getStartSequenceNumbers().getExclusivePartitions()); Assert.assertNull(config.getAwsAssumedRoleArn()); Assert.assertNull(config.getAwsExternalId()); - Assert.assertFalse(config.isDeaggregate()); } @Test @@ -115,11 +112,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"minimumMessageTime\": \"2016-05-31T12:00Z\",\n" + " \"maximumMessageTime\": \"2016-05-31T14:00Z\",\n" + " \"endpoint\": \"kinesis.us-east-2.amazonaws.com\",\n" - + " \"recordsPerFetch\": 1000,\n" + " \"fetchDelayMillis\": 1000,\n" + " \"awsAssumedRoleArn\": \"role\",\n" - + " \"awsExternalId\": \"awsexternalid\",\n" - + " \"deaggregate\": true\n" + + " \"awsExternalId\": \"awsexternalid\"\n" + "}"; KinesisIndexTaskIOConfig config = (KinesisIndexTaskIOConfig) mapper.readValue( @@ -150,11 +145,9 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get()); Assert.assertEquals(config.getEndpoint(), "kinesis.us-east-2.amazonaws.com"); Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(), ImmutableSet.of("0")); - Assert.assertEquals(1000, (int) config.getRecordsPerFetchConfigured()); Assert.assertEquals(1000, config.getFetchDelayMillis()); Assert.assertEquals("role", config.getAwsAssumedRoleArn()); Assert.assertEquals("awsexternalid", config.getAwsExternalId()); - Assert.assertTrue(config.isDeaggregate()); } @Test @@ -272,11 +265,9 @@ public void testDeserializeToOldIoConfig() throws IOException DateTimes.nowUtc(), null, "endpoint", - 1000, 2000, "awsAssumedRoleArn", - "awsExternalId", - true + "awsExternalId" ); final byte[] json = mapper.writeValueAsBytes(currentConfig); @@ -302,11 +293,9 @@ public void testDeserializeToOldIoConfig() throws IOException Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime()); Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime()); Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint()); - Assert.assertEquals((int) currentConfig.getRecordsPerFetchConfigured(), oldConfig.getRecordsPerFetch()); Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis()); Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn()); Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId()); - Assert.assertEquals(currentConfig.isDeaggregate(), oldConfig.isDeaggregate()); } @Test @@ -324,11 +313,9 @@ public void testDeserializeFromOldIoConfig() throws IOException DateTimes.nowUtc(), DateTimes.nowUtc(), "endpoint", - 1000, 2000, "awsAssumedRoleArn", - "awsExternalId", - true + "awsExternalId" ); final byte[] json = oldMapper.writeValueAsBytes(oldConfig); @@ -349,11 +336,9 @@ public void testDeserializeFromOldIoConfig() throws IOException Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime()); Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime()); Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint()); - Assert.assertEquals(oldConfig.getRecordsPerFetch(), (int) currentConfig.getRecordsPerFetchConfigured()); Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis()); Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn()); Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId()); - Assert.assertEquals(oldConfig.isDeaggregate(), currentConfig.isDeaggregate()); } private static class OldKinesisIndexTaskIoConfig implements IOConfig @@ -366,12 +351,9 @@ private static class OldKinesisIndexTaskIoConfig implements IOConfig private final Optional minimumMessageTime; private final Optional maximumMessageTime; private final String endpoint; - private final Integer recordsPerFetch; private final Integer fetchDelayMillis; - private final String awsAssumedRoleArn; private final String awsExternalId; - private final boolean deaggregate; @JsonCreator private OldKinesisIndexTaskIoConfig( @@ -383,11 +365,9 @@ private OldKinesisIndexTaskIoConfig( @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, @JsonProperty("maximumMessageTime") DateTime maximumMessageTime, @JsonProperty("endpoint") String endpoint, - @JsonProperty("recordsPerFetch") Integer recordsPerFetch, @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, - @JsonProperty("awsExternalId") String awsExternalId, - @JsonProperty("deaggregate") boolean deaggregate + @JsonProperty("awsExternalId") String awsExternalId ) { this.baseSequenceName = baseSequenceName; @@ -398,11 +378,9 @@ private OldKinesisIndexTaskIoConfig( this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); this.maximumMessageTime = Optional.fromNullable(maximumMessageTime); this.endpoint = endpoint; - this.recordsPerFetch = recordsPerFetch; this.fetchDelayMillis = fetchDelayMillis; this.awsAssumedRoleArn = awsAssumedRoleArn; this.awsExternalId = awsExternalId; - this.deaggregate = deaggregate; } @JsonProperty @@ -453,12 +431,6 @@ public String getEndpoint() return endpoint; } - @JsonProperty - public int getRecordsPerFetch() - { - return recordsPerFetch; - } - @JsonProperty public int getFetchDelayMillis() { @@ -476,11 +448,5 @@ public String getAwsExternalId() { return awsExternalId; } - - @JsonProperty - public boolean isDeaggregate() - { - return deaggregate; - } } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 2cba3f54187f..da04e3ab0a6d 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -76,6 +76,8 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( @@ -90,9 +92,7 @@ public class KinesisIndexTaskSerdeTest "endpoint", null, null, - null, - null, - false + null ); private static final String ACCESS_KEY = "test-access-key"; private static final String SECRET_KEY = "test-secret-key"; diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 69516979f3e1..e14b6679b09e 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -184,6 +184,7 @@ public static Iterable constructorFeeder() private Long maxTotalRows = null; private final Period intermediateHandoffPeriod = null; private int maxRecordsPerPoll; + private int maxBytesPerPoll; @BeforeClass public static void setupClass() @@ -218,6 +219,7 @@ public void setupTest() throws IOException, InterruptedException doHandoff = true; reportsFile = File.createTempFile("KinesisIndexTaskTestReports-" + System.currentTimeMillis(), "json"); maxRecordsPerPoll = 1; + maxBytesPerPoll = 1_000_000; recordSupplier = mock(KinesisRecordSupplier.class); @@ -562,6 +564,7 @@ public void testIncrementalHandOff() throws Exception // as soon as any segment has more than one record, incremental publishing should happen maxRowsPerSegment = 2; maxRecordsPerPoll = 1; + maxBytesPerPoll = 1_000_000; recordSupplier.assign(EasyMock.anyObject()); EasyMock.expectLastCall().anyTimes(); @@ -779,9 +782,7 @@ public void testRunWithMinimumMessageTime() throws Exception "awsEndpoint", null, null, - null, - null, - false + null ) ); @@ -843,9 +844,7 @@ public void testRunWithMaximumMessageTime() throws Exception "awsEndpoint", null, null, - null, - null, - false + null ) ); @@ -1697,6 +1696,7 @@ public void testRestoreAfterPersistingSequences() throws Exception { maxRowsPerSegment = 2; maxRecordsPerPoll = 1; + maxBytesPerPoll = 1_000_000; List> records = clone(SINGLE_PARTITION_RECORDS); @@ -1935,9 +1935,7 @@ public void testRunContextSequenceAheadOfStartingOffsets() throws Exception "awsEndpoint", null, null, - null, - null, - false + null ), context ); @@ -2099,9 +2097,7 @@ public void testSequencesFromContext() throws IOException "awsEndpoint", null, null, - null, - null, - false + null ), context ); @@ -2250,10 +2246,15 @@ public void testRunWithoutDataInserted() throws Exception public void testComputeFetchThreads() { final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo = - new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 2000); + new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 10_000_000_000L); Assert.assertEquals(6, KinesisIndexTask.computeFetchThreads(runtimeInfo, null)); Assert.assertEquals(2, KinesisIndexTask.computeFetchThreads(runtimeInfo, 2)); + + final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo2 = + new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 1_000_000_000); + Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2, null)); + Assert.assertEquals(5, KinesisIndexTask.computeFetchThreads(runtimeInfo2, 6)); Assert.assertThrows( IllegalArgumentException.class, () -> KinesisIndexTask.computeFetchThreads(runtimeInfo, 0) @@ -2297,9 +2298,7 @@ private KinesisIndexTask createTask( "awsEndpoint", null, null, - null, - null, - false + null ), null ); @@ -2358,10 +2357,12 @@ private KinesisIndexTask createTask( null, null, null, + null, logParseExceptions, maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, + maxBytesPerPoll, intermediateHandoffPeriod ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index cd99521c18e8..b61c5cf2ae48 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -77,8 +77,10 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertFalse(config.isReportParseExceptions()); Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); - Assert.assertNull(config.getRecordBufferSizeConfigured()); - Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); + Assert.assertNull(config.getRecordBufferSizeBytesConfigured()); + Assert.assertEquals(100_000_000, config.getRecordBufferSizeBytesOrDefault(2_000_000_000)); + Assert.assertEquals(100_000_000, config.getRecordBufferSizeBytesOrDefault(1_000_000_000)); + Assert.assertEquals(10_000_000, config.getRecordBufferSizeBytesOrDefault(100_000_000)); Assert.assertEquals(5000, config.getRecordBufferOfferTimeout()); Assert.assertEquals(5000, config.getRecordBufferFullWait()); Assert.assertNull(config.getFetchThreads()); @@ -98,7 +100,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" - + " \"recordBufferSize\": 1000,\n" + + " \"recordBufferSizeBytes\": 1000,\n" + " \"recordBufferOfferTimeout\": 500,\n" + " \"recordBufferFullWait\": 500,\n" + " \"resetOffsetAutomatically\": false,\n" @@ -125,8 +127,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertTrue(config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); - Assert.assertEquals(1000, (int) config.getRecordBufferSizeConfigured()); - Assert.assertEquals(1000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); + Assert.assertEquals(1000, (int) config.getRecordBufferSizeBytesConfigured()); + Assert.assertEquals(1000, config.getRecordBufferSizeBytesOrDefault(1_000_000_000)); Assert.assertEquals(500, config.getRecordBufferOfferTimeout()); Assert.assertEquals(500, config.getRecordBufferFullWait()); Assert.assertEquals(2, (int) config.getFetchThreads()); @@ -153,6 +155,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 5L, true, false, + null, 1000, 1000, 500, @@ -162,6 +165,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 500, 6000, + 1_000_000, new Period("P3D") ); @@ -190,7 +194,9 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); + Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), deserialized.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); + Assert.assertEquals(base.getMaxBytesPerPollConfigured(), deserialized.getMaxBytesPerPollConfigured()); } @Test @@ -212,6 +218,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 5L, true, false, + null, 1000, 1000, 500, @@ -220,6 +227,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException false, 500, 500, + 1_000_000, 6000, new Period("P3D") ); @@ -247,7 +255,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); - Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); + Assert.assertEquals(base.getRecordBufferSizeBytesConfigured(), deserialized.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); } @@ -301,6 +309,7 @@ public void testConvert() null, null, null, + null, 1000, 500, 500, @@ -309,6 +318,7 @@ public void testConvert() null, null, 10, + 1_000_000, null, null, null, @@ -327,7 +337,7 @@ public void testConvert() Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec()); Assert.assertTrue(copy.isReportParseExceptions()); Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); - Assert.assertEquals(1000, (int) copy.getRecordBufferSizeConfigured()); + Assert.assertEquals(1000, (int) copy.getRecordBufferSizeBytesConfigured()); Assert.assertEquals(500, copy.getRecordBufferOfferTimeout()); Assert.assertEquals(500, copy.getRecordBufferFullWait()); Assert.assertEquals(2, (int) copy.getFetchThreads()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java index af0755fcd4bb..5fcf81139eb6 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplierTest.java @@ -147,8 +147,6 @@ private static ByteBuffer jb(String timestamp, String dim1, String dim2, String throw new RuntimeException(e); } } - - private static int recordsPerFetch; private static AmazonKinesis kinesis; private static ListShardsResult listShardsResult0; private static ListShardsResult listShardsResult1; @@ -180,7 +178,6 @@ public void setupTest() getRecordsResult1 = createMock(GetRecordsResult.class); shard0 = createMock(Shard.class); shard1 = createMock(Shard.class); - recordsPerFetch = 1; } @After @@ -219,14 +216,12 @@ public void testSupplierSetup_withoutListShards() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, 100, 5000, 5000, - 5, + 1_000_000, true, false ); @@ -278,14 +273,12 @@ public void testSupplierSetup_withListShards() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, 100, 5000, 5000, - 5, + 1_000_000, true, true ); @@ -312,7 +305,12 @@ public void testSupplierSetup_withListShards() Assert.assertEquals(expectedRequest1, capturedRequest1.getValue()); } - private static GetRecordsRequest generateGetRecordsReq(String shardIterator, int limit) + private static GetRecordsRequest generateGetRecordsReq(String shardIterator) + { + return new GetRecordsRequest().withShardIterator(shardIterator); + } + + private static GetRecordsRequest generateGetRecordsWithLimitReq(String shardIterator, int limit) { return new GetRecordsRequest().withShardIterator(shardIterator).withLimit(limit); } @@ -326,87 +324,9 @@ private static List> clea .collect(Collectors.toList()); } - @Test - public void testPoll() throws InterruptedException - { - recordsPerFetch = 100; - - EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID0), - EasyMock.anyString(), - EasyMock.anyString() - )).andReturn( - getShardIteratorResult0).anyTimes(); - - EasyMock.expect(kinesis.getShardIterator( - EasyMock.anyObject(), - EasyMock.eq(SHARD_ID1), - EasyMock.anyString(), - EasyMock.anyString() - )).andReturn( - getShardIteratorResult1).anyTimes(); - - EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); - EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) - .andReturn(getRecordsResult0) - .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) - .andReturn(getRecordsResult1) - .anyTimes(); - EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); - EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS).once(); - EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult1.getNextShardIterator()).andReturn(null).anyTimes(); - EasyMock.expect(getRecordsResult0.getMillisBehindLatest()).andReturn(SHARD0_LAG_MILLIS).once(); - EasyMock.expect(getRecordsResult1.getMillisBehindLatest()).andReturn(SHARD1_LAG_MILLIS).once(); - - replayAll(); - - Set> partitions = ImmutableSet.of( - StreamPartition.of(STREAM, SHARD_ID0), - StreamPartition.of(STREAM, SHARD_ID1) - ); - - - recordSupplier = new KinesisRecordSupplier( - kinesis, - recordsPerFetch, - 0, - 2, - false, - 100, - 5000, - 5000, - 100, - true, - false - ); - - recordSupplier.assign(partitions); - recordSupplier.seekToEarliest(partitions); - recordSupplier.start(); - - while (recordSupplier.bufferSize() < 12) { - Thread.sleep(100); - } - - List> polledRecords = cleanRecords(recordSupplier.poll( - POLL_TIMEOUT_MILLIS)); - - verifyAll(); - - Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertTrue(polledRecords.containsAll(ALL_RECORDS)); - Assert.assertEquals(SHARDS_LAG_MILLIS, recordSupplier.getPartitionResourcesTimeLag()); - } - @Test public void testPollWithKinesisInternalFailure() throws InterruptedException { - recordsPerFetch = 100; - EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), @@ -425,10 +345,10 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andReturn(getRecordsResult0) .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) .andReturn(getRecordsResult1) .anyTimes(); AmazonServiceException getException = new AmazonServiceException("InternalFailure"); @@ -460,14 +380,12 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -493,8 +411,6 @@ public void testPollWithKinesisInternalFailure() throws InterruptedException @Test public void testPollWithKinesisNonRetryableFailure() throws InterruptedException { - recordsPerFetch = 100; - EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), @@ -508,7 +424,7 @@ public void testPollWithKinesisNonRetryableFailure() throws InterruptedException getException.setStatusCode(400); getException.setServiceName("AmazonKinesis"); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andThrow(getException) .once(); @@ -521,14 +437,12 @@ public void testPollWithKinesisNonRetryableFailure() throws InterruptedException recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 1, - false, 100, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -556,8 +470,6 @@ public void testPollWithKinesisNonRetryableFailure() throws InterruptedException public void testSeek() throws InterruptedException { - recordsPerFetch = 100; - EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), @@ -576,10 +488,10 @@ public void testSeek() EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andReturn(getRecordsResult0) .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) .andReturn(getRecordsResult1) .anyTimes(); EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS.subList(1, SHARD0_RECORDS.size())).once(); @@ -600,14 +512,12 @@ public void testSeek() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -636,8 +546,6 @@ public void testSeek() public void testSeekToLatest() throws InterruptedException { - recordsPerFetch = 100; - EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), @@ -668,14 +576,12 @@ public void testSeekToLatest() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, 100, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -703,14 +609,12 @@ public void testSeekUnassigned() throws InterruptedException recordSupplier = new KinesisRecordSupplier( kinesis, - 1, 0, 2, - false, 100, 5000, 5000, - 5, + 1_000_000, true, false ); @@ -725,7 +629,6 @@ public void testPollAfterSeek() throws InterruptedException { // tests that after doing a seek, the now invalid records in buffer is cleaned up properly - recordsPerFetch = 100; EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), @@ -745,10 +648,10 @@ public void testPollAfterSeek() EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).once(); EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) .andReturn(getRecordsResult1) .once(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andReturn(getRecordsResult0) .once(); EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS.subList(5, SHARD1_RECORDS.size())).once(); @@ -766,14 +669,12 @@ public void testPollAfterSeek() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - false, - 100, + 10_000, 5000, 5000, - 1, + 1_000_000, true, false ); @@ -816,8 +717,6 @@ public void testPollAfterSeek() @Test public void testPollDeaggregate() throws InterruptedException { - recordsPerFetch = 100; - EasyMock.expect(kinesis.getShardIterator( EasyMock.anyObject(), EasyMock.eq(SHARD_ID0), @@ -836,10 +735,10 @@ public void testPollDeaggregate() throws InterruptedException EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andReturn(getRecordsResult0) .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) .andReturn(getRecordsResult1) .anyTimes(); EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).once(); @@ -859,14 +758,12 @@ public void testPollDeaggregate() throws InterruptedException recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - true, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -922,7 +819,7 @@ public void getLatestSequenceNumberWhenKinesisRetryableException() EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).once(); AmazonClientException ex = new AmazonClientException(new IOException()); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR, 1000))) .andThrow(ex) .andReturn(getRecordsResult0) .once(); @@ -935,14 +832,12 @@ public void getLatestSequenceNumberWhenKinesisRetryableException() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - true, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -961,7 +856,7 @@ private KinesisRecordSupplier getSequenceNumberWhenNoRecordsHelperForOpenShard() EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).times(1); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, 1000))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR, 1000))) .andReturn(getRecordsResult0) .times(1); @@ -972,14 +867,12 @@ private KinesisRecordSupplier getSequenceNumberWhenNoRecordsHelperForOpenShard() recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - true, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -1033,12 +926,18 @@ public void getPartitionTimeLag() throws InterruptedException EasyMock.expect(getShardIteratorResult0.getShardIterator()).andReturn(SHARD0_ITERATOR).anyTimes(); EasyMock.expect(getShardIteratorResult1.getShardIterator()).andReturn(SHARD1_ITERATOR).anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD0_ITERATOR))) .andReturn(getRecordsResult0) .anyTimes(); - EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR, recordsPerFetch))) + EasyMock.expect(kinesis.getRecords(generateGetRecordsReq(SHARD1_ITERATOR))) .andReturn(getRecordsResult1) .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD0_ITERATOR, 1))) + .andReturn(getRecordsResult0) + .anyTimes(); + EasyMock.expect(kinesis.getRecords(generateGetRecordsWithLimitReq(SHARD1_ITERATOR, 1))) + .andReturn(getRecordsResult1) + .anyTimes(); EasyMock.expect(getRecordsResult0.getRecords()).andReturn(SHARD0_RECORDS).times(2); EasyMock.expect(getRecordsResult1.getRecords()).andReturn(SHARD1_RECORDS_EMPTY).times(2); EasyMock.expect(getRecordsResult0.getNextShardIterator()).andReturn(null).anyTimes(); @@ -1055,14 +954,12 @@ public void getPartitionTimeLag() throws InterruptedException recordSupplier = new KinesisRecordSupplier( kinesis, - recordsPerFetch, 0, 2, - true, - 100, + 10_000, 5000, 5000, - 100, + 1_000_000, true, false ); @@ -1110,17 +1007,16 @@ public void getPartitionTimeLag() throws InterruptedException public void testIsOffsetAvailable() { AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class); - KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis, - recordsPerFetch, - 0, - 2, - false, - 100, - 5000, - 5000, - 5, - true, - false + KinesisRecordSupplier target = new KinesisRecordSupplier( + mockKinesis, + 0, + 2, + 100, + 5000, + 5000, + 1_000_000, + true, + false ); StreamPartition partition = new StreamPartition<>(STREAM, SHARD_ID0); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index fabde1852aad..4de35cf5e5d0 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -70,11 +70,9 @@ public void testSerdeWithDefaults() throws Exception Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); - Assert.assertNull(config.getRecordsPerFetch()); Assert.assertEquals(0, config.getFetchDelayMillis()); Assert.assertNull(config.getAwsAssumedRoleArn()); Assert.assertNull(config.getAwsExternalId()); - Assert.assertFalse(config.isDeaggregate()); } @Test @@ -94,11 +92,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"completionTimeout\": \"PT45M\",\n" + " \"lateMessageRejectionPeriod\": \"PT1H\",\n" + " \"earlyMessageRejectionPeriod\": \"PT1H\",\n" - + " \"recordsPerFetch\": 4000,\n" + " \"fetchDelayMillis\": 1000,\n" + " \"awsAssumedRoleArn\": \"role\",\n" - + " \"awsExternalId\": \"awsexternalid\",\n" - + " \"deaggregate\": true\n" + + " \"awsExternalId\": \"awsexternalid\"\n" + "}"; KinesisSupervisorIOConfig config = mapper.readValue( @@ -117,11 +113,9 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(Duration.standardMinutes(45), config.getCompletionTimeout()); Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get()); - Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch()); Assert.assertEquals(1000, config.getFetchDelayMillis()); Assert.assertEquals("role", config.getAwsAssumedRoleArn()); Assert.assertEquals("awsexternalid", config.getAwsExternalId()); - Assert.assertTrue(config.isDeaggregate()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index c9b6406bd3f9..87297dd8f281 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -208,6 +208,8 @@ public void setupTest() null, null, null, + null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -318,7 +320,6 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception null, false, null, - null, autoScalerConfig ); KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); @@ -392,7 +393,6 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception null, false, null, - null, autoScalerConfig ); @@ -509,26 +509,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() { // create KinesisSupervisorIOConfig with autoScalerConfig null KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - null, - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + null, + false ); AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoScalerConfig(); @@ -536,26 +536,26 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() // create KinesisSupervisorIOConfig with autoScalerConfig Empty KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), - false + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), + false ); AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoScalerConfig(); @@ -3740,7 +3740,6 @@ public void testDoNotKillCompatibleTasks() new Period("P1D"), new Period("P1D"), false, - 42, 1000, true ); @@ -3838,7 +3837,6 @@ public void testKillIncompatibleTasks() new Period("P1D"), new Period("P1D"), false, - 42, 1000, false ); @@ -3924,7 +3922,6 @@ public void testIsTaskCurrent() new Period("P1D"), false, 42, - 42, dataSchema, tuningConfig ); @@ -3967,7 +3964,9 @@ public void testIsTaskCurrent() null, null, null, + null, 42, // This property is different from tuningConfig + 1_000_000, null, null, null, @@ -4074,7 +4073,6 @@ public void testSequenceNameDoesNotChangeWithTaskId() new Period("P1D"), false, 42, - 42, dataSchema, tuningConfig ); @@ -5149,6 +5147,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, + null, null ); @@ -5198,7 +5198,6 @@ private TestableKinesisSupervisor getTestableSupervisor( earlyMessageRejectionPeriod, false, null, - null, null ); } @@ -5211,7 +5210,6 @@ private TestableKinesisSupervisor getTestableSupervisor( Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, boolean suspended, - Integer recordsPerFetch, Integer fetchDelayMillis, AutoScalerConfig autoScalerConfig ) @@ -5231,7 +5229,7 @@ private TestableKinesisSupervisor getTestableSupervisor( lateMessageRejectionPeriod, earlyMessageRejectionPeriod, null, - recordsPerFetch, + null, fetchDelayMillis, null, null, @@ -5301,7 +5299,6 @@ private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, boolean suspended, - Integer recordsPerFetch, Integer fetchDelayMillis, boolean isTaskCurrentReturn ) @@ -5321,7 +5318,7 @@ private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( lateMessageRejectionPeriod, earlyMessageRejectionPeriod, null, - recordsPerFetch, + null, fetchDelayMillis, null, null, @@ -5389,7 +5386,6 @@ private KinesisSupervisor createSupervisor( Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, boolean suspended, - Integer recordsPerFetch, Integer fetchDelayMillis, DataSchema dataSchema, KinesisSupervisorTuningConfig tuningConfig @@ -5410,7 +5406,7 @@ private KinesisSupervisor createSupervisor( lateMessageRejectionPeriod, earlyMessageRejectionPeriod, null, - recordsPerFetch, + null, fetchDelayMillis, null, null, @@ -5559,9 +5555,7 @@ private KinesisIndexTask createKinesisIndexTask( "awsEndpoint", null, null, - null, - null, - false + null ), Collections.emptyMap(), false, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 70611266efe3..ac84d2105cd1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -52,6 +52,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("skipSequenceNumberAvailabilityCheck") Boolean skipSequenceNumberAvailabilityCheck, @JsonProperty("recordBufferSize") Integer recordBufferSize, + @JsonProperty("recordBufferSizeBytes") Integer recordBufferSizeBytes, @JsonProperty("recordBufferOfferTimeout") Integer recordBufferOfferTimeout, @JsonProperty("recordBufferFullWait") Integer recordBufferFullWait, @JsonProperty("fetchThreads") Integer fetchThreads, @@ -60,6 +61,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, + @JsonProperty("maxBytesPerPoll") @Nullable Integer maxBytesPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("extra") String extra ) @@ -81,6 +83,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( resetOffsetAutomatically, skipSequenceNumberAvailabilityCheck, recordBufferSize, + recordBufferSizeBytes, recordBufferOfferTimeout, recordBufferFullWait, fetchThreads, @@ -89,6 +92,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, + maxBytesPerPoll, intermediateHandoffPeriod ); this.extra = extra; @@ -113,6 +117,7 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.isResetOffsetAutomatically(), base.isSkipSequenceNumberAvailabilityCheck(), base.getRecordBufferSizeConfigured(), + base.getRecordBufferSizeBytesConfigured(), base.getRecordBufferOfferTimeout(), base.getRecordBufferFullWait(), base.getFetchThreads(), @@ -121,6 +126,7 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), + base.getMaxBytesPerPollConfigured(), base.getIntermediateHandoffPeriod() ); this.extra = extra; diff --git a/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java new file mode 100644 index 000000000000..49105f3a8337 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueue.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Stream; + +/** + * Similar to LinkedBlockingQueue but can be bounded by the total byte size of the items present in the queue + * rather than number of items. + */ +public class MemoryBoundLinkedBlockingQueue +{ + private final long memoryBound; + private final AtomicLong currentMemory; + private final LinkedBlockingQueue> queue; + private final ReentrantLock putLock = new ReentrantLock(); + private final Condition notFull = putLock.newCondition(); + + public MemoryBoundLinkedBlockingQueue(long memoryBound) + { + this(new LinkedBlockingQueue<>(), memoryBound); + } + + @VisibleForTesting + MemoryBoundLinkedBlockingQueue(LinkedBlockingQueue> queue, long memoryBound) + { + this.memoryBound = memoryBound; + this.currentMemory = new AtomicLong(0L); + this.queue = queue; + } + + // returns true/false depending on whether item was added or not + public boolean offer(ObjectContainer item) + { + final long itemLength = item.getSize(); + + if (currentMemory.addAndGet(itemLength) <= memoryBound) { + if (queue.offer(item)) { + return true; + } + } + currentMemory.addAndGet(-itemLength); + return false; + } + + public boolean offer(ObjectContainer item, long timeout, TimeUnit unit) throws InterruptedException + { + final long itemLength = item.getSize(); + + long nanos = unit.toNanos(timeout); + final ReentrantLock putLock = this.putLock; + putLock.lockInterruptibly(); + try { + while (currentMemory.get() + itemLength > memoryBound) { + if (nanos <= 0L) { + return false; + } + nanos = notFull.awaitNanos(nanos); + } + if (currentMemory.addAndGet(itemLength) <= memoryBound) { + if (queue.offer(item, timeout, unit)) { + return true; + } + } + } + catch (InterruptedException e) { + currentMemory.addAndGet(-itemLength); + throw e; + } + finally { + putLock.unlock(); + } + currentMemory.addAndGet(-itemLength); + return false; + } + + // blocks until at least one item is available to take + public ObjectContainer take() throws InterruptedException + { + final ObjectContainer ret = queue.take(); + currentMemory.addAndGet(-ret.getSize()); + signalNotFull(); + return ret; + } + + public Stream> stream() + { + return queue.stream(); + } + + /** + * Drain up to specified bytes worth of items from the queue into the provided buffer. At least one record is + * drained from the queue, regardless of the value of bytes specified. + * + * @param buffer The buffer to drain queue items into. + * @param bytesToDrain The amount of bytes to drain from the queue + * @param timeout The maximum time allowed to drain the queue + * @param unit The time unit of the timeout. + * + * @return The number of items drained from the queue. + * @throws InterruptedException + */ + public int drain(Collection> buffer, int bytesToDrain, long timeout, TimeUnit unit) + throws InterruptedException + { + Preconditions.checkNotNull(buffer); + boolean signalNotFull = false; + try { + long deadline = System.nanoTime() + unit.toNanos(timeout); + int added = 0; + long bytesAdded = 0; + while (bytesAdded < bytesToDrain) { + ObjectContainer e = queue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); + if (e == null) { + break; + } + currentMemory.addAndGet(-e.getSize()); + signalNotFull = true; + buffer.add(e); + ++added; + bytesAdded += e.getSize(); + e = queue.peek(); + if (e != null && (bytesAdded + e.getSize()) > bytesToDrain) { + break; + } + } + return added; + } + finally { + if (signalNotFull) { + signalNotFull(); + } + } + } + + public int size() + { + return queue.size(); + } + + public long byteSize() + { + return currentMemory.get(); + } + + public long remainingCapacity() + { + return memoryBound - currentMemory.get(); + } + + private void signalNotFull() + { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } + finally { + putLock.unlock(); + } + } + + public static class ObjectContainer + { + private final T data; + private final long size; + + public ObjectContainer(T data, long size) + { + this.data = data; + this.size = size; + } + + public T getData() + { + return data; + } + + public long getSize() + { + return size; + } + } +} + diff --git a/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java new file mode 100644 index 000000000000..ec36d83f250c --- /dev/null +++ b/processing/src/test/java/org/apache/druid/java/util/common/MemoryBoundLinkedBlockingQueueTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.collect.ImmutableList; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class MemoryBoundLinkedBlockingQueueTest +{ + @Test + public void test_offer_emptyQueueWithEnoughCapacity_true() + { + + long byteCapacity = 10L; + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, ImmutableList.of()); + byte[] item = "item".getBytes(StandardCharsets.UTF_8); + + boolean succeeds = queue.offer(new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, item.length)); + + long expectedByteSize = item.length; + Assert.assertTrue(succeeds); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(expectedByteSize, queue.byteSize()); + Assert.assertEquals(byteCapacity - item.length, queue.remainingCapacity()); + } + + @Test + public void test_offer_nonEmptyQueueWithEnoughCapacity_true() + { + + long byteCapacity = 10L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, items); + + boolean succeeds = queue.offer(new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length)); + + long expectedByteSize = item1.length + item2.length; + Assert.assertTrue(succeeds); + Assert.assertEquals(2, queue.size()); + Assert.assertEquals(expectedByteSize, queue.byteSize()); + Assert.assertEquals(byteCapacity - expectedByteSize, queue.remainingCapacity()); + } + + @Test + public void test_offer_queueWithoutEnoughCapacity_false() + { + + long byteCapacity = 7L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, items); + + boolean succeeds = queue.offer(new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length)); + + long expectedByteSize = item1.length; + Assert.assertFalse(succeeds); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(expectedByteSize, queue.byteSize()); + Assert.assertEquals(byteCapacity - expectedByteSize, queue.remainingCapacity()); + } + + @Test + public void test_offerWithTimeLimit_interruptedExceptinThrown_throws() + { + long byteCapacity = 10L; + MemoryBoundLinkedBlockingQueue queue = setupQueue( + byteCapacity, + ImmutableList.of(), + new InterruptedExceptionThrowingQueue() + ); + byte[] item = "item".getBytes(StandardCharsets.UTF_8); + + Assert.assertThrows( + InterruptedException.class, + () -> queue.offer( + new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, item.length), + 1L, + TimeUnit.MILLISECONDS + ) + ); + + Assert.assertEquals(0, queue.size()); + Assert.assertEquals(0L, queue.byteSize()); + Assert.assertEquals(byteCapacity, queue.remainingCapacity()); + } + + @Test + public void test_offerWithTimeLimit_fullQueue_waitsTime() throws InterruptedException + { + long timeoutMillis = 2000L; + long byteCapacity = 10L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1, item2) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue( + byteCapacity, + items, + new InterruptedExceptionThrowingQueue() + ); + byte[] item = "item".getBytes(StandardCharsets.UTF_8); + long start = System.currentTimeMillis(); + boolean succeeds = queue.offer( + new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item, item.length), + timeoutMillis, + TimeUnit.MILLISECONDS + ); + long end = System.currentTimeMillis(); + + Assert.assertFalse(succeeds); + Assert.assertTrue( + StringUtils.format( + "offer only waited at most [%d] nanos instead of expected [%d] nanos", + TimeUnit.MILLISECONDS.toNanos(end - start), + TimeUnit.MILLISECONDS.toNanos(timeoutMillis) + ), + TimeUnit.MILLISECONDS.toNanos(end - start) >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis) + ); + Assert.assertEquals(2, queue.size()); + Assert.assertEquals(10L, queue.byteSize()); + Assert.assertEquals(0L, queue.remainingCapacity()); + } + + @Test + public void test_take_nonEmptyQueue_expected() throws InterruptedException + { + + long byteCapacity = 10L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + MemoryBoundLinkedBlockingQueue.ObjectContainer item1Container = + new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item1, item1.length); + MemoryBoundLinkedBlockingQueue.ObjectContainer item2Container = + new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(item2, item2.length); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, ImmutableList.of()); + Assert.assertTrue(queue.offer(item1Container)); + Assert.assertTrue(queue.offer(item2Container)); + + MemoryBoundLinkedBlockingQueue.ObjectContainer takenItem = queue.take(); + long expectedByteSize = item2.length; + + Assert.assertSame(item1Container, takenItem); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(expectedByteSize, queue.byteSize()); + Assert.assertEquals(byteCapacity - expectedByteSize, queue.remainingCapacity()); + } + + @Test + public void test_drain_emptyQueue_succeeds() throws InterruptedException + { + + long byteCapacity = 7L; + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, ImmutableList.of()); + Collection> buffer = new ArrayList<>(); + + int numAdded = queue.drain(buffer, 1, 1, TimeUnit.SECONDS); + + Assert.assertTrue(numAdded == 0 && numAdded == buffer.size()); + Assert.assertEquals(0, queue.size()); + Assert.assertEquals(0L, queue.byteSize()); + Assert.assertEquals(byteCapacity, queue.remainingCapacity()); + } + + @Test + public void test_drain_queueWithOneItem_succeeds() throws InterruptedException + { + + long byteCapacity = 7L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, items); + Collection> buffer = new ArrayList<>(); + + int numAdded = queue.drain(buffer, 1, 1, TimeUnit.MINUTES); + + Assert.assertTrue(numAdded == 1 && numAdded == buffer.size()); + Assert.assertEquals(0, queue.size()); + Assert.assertEquals(0L, queue.byteSize()); + Assert.assertEquals(byteCapacity, queue.remainingCapacity()); + } + + @Test + public void test_drain_queueWithMultipleItems_succeeds() throws InterruptedException + { + + long byteCapacity = 15L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1, item2, item3) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, items, new NotAllDrainedQueue()); + Collection> buffer = new ArrayList<>(); + + int numAdded = queue.drain(buffer, 10, 1, TimeUnit.MINUTES); + + Assert.assertTrue(numAdded == 2 && numAdded == buffer.size()); + Assert.assertEquals(1, queue.size()); + Assert.assertEquals(item3.length, queue.byteSize()); + Assert.assertEquals(byteCapacity - item3.length, queue.remainingCapacity()); + } + + @Test + public void test_drain_queueWithFirstItemSizeGreaterThanLimit_succeeds() throws InterruptedException + { + + long byteCapacity = 15L; + byte[] item1 = "item1".getBytes(StandardCharsets.UTF_8); + byte[] item2 = "item2".getBytes(StandardCharsets.UTF_8); + byte[] item3 = "item3".getBytes(StandardCharsets.UTF_8); + Collection> items = buildItemContainers( + ImmutableList.of(item1, item2, item3) + ); + MemoryBoundLinkedBlockingQueue queue = setupQueue(byteCapacity, items, new NotAllDrainedQueue()); + Collection> buffer = new ArrayList<>(); + + int numAdded = queue.drain(buffer, item1.length - 1, 1, TimeUnit.MINUTES); + + Assert.assertTrue(numAdded == 1 && numAdded == buffer.size()); + Assert.assertEquals(2, queue.size()); + Assert.assertEquals(item2.length + item3.length, queue.byteSize()); + Assert.assertEquals(byteCapacity - (item2.length + item3.length), queue.remainingCapacity()); + } + + private static MemoryBoundLinkedBlockingQueue setupQueue( + long byteCapacity, + Collection> items + ) + { + return setupQueue(byteCapacity, items, null); + } + + private static MemoryBoundLinkedBlockingQueue setupQueue( + long byteCapacity, + Collection> items, + @Nullable LinkedBlockingQueue> underlyingQueue + ) + { + Assert.assertTrue(getTotalSizeOfItems(items) <= byteCapacity); + MemoryBoundLinkedBlockingQueue queue = underlyingQueue != null ? + new MemoryBoundLinkedBlockingQueue<>(underlyingQueue, byteCapacity) : new MemoryBoundLinkedBlockingQueue<>(byteCapacity); + items.forEach(i -> Assert.assertTrue(queue.offer(i))); + return queue; + } + + private static Collection> buildItemContainers( + Collection items + ) + { + return items.stream() + .map(i -> new MemoryBoundLinkedBlockingQueue.ObjectContainer<>(i, i.length)) + .collect(Collectors.toList()); + } + + private static long getTotalSizeOfItems(Collection> items) + { + return items.stream().mapToLong(MemoryBoundLinkedBlockingQueue.ObjectContainer::getSize).sum(); + } + + static class InterruptedExceptionThrowingQueue + extends LinkedBlockingQueue> + { + @Override + public boolean offer(MemoryBoundLinkedBlockingQueue.ObjectContainer item, long timeout, TimeUnit unit) + throws InterruptedException + { + throw new InterruptedException("exception thrown"); + } + } + + static class NotAllDrainedQueue + extends LinkedBlockingQueue> + { + @Override + public int drainTo(Collection> c, int maxElements) + { + MemoryBoundLinkedBlockingQueue.ObjectContainer firstItem = this.poll(); + c.add(firstItem); + return 1; + } + } +} diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index 3594f70c992e..41f77593d28a 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -1222,13 +1222,6 @@ export function getIoConfigTuningFormFields( ), }, - { - name: 'recordsPerFetch', - type: 'number', - defaultValue: 4000, - defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), - info: <>The number of records to request per GetRecords call to Kinesis., - }, { name: 'pollTimeout', type: 'number', @@ -1249,13 +1242,6 @@ export function getIoConfigTuningFormFields( defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), info: <>Time in milliseconds to wait between subsequent GetRecords calls to Kinesis., }, - { - name: 'deaggregate', - type: 'boolean', - defaultValue: false, - defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), - info: <>Whether to use the de-aggregate function of the KCL., - }, { name: 'startDelay', type: 'duration', @@ -1443,7 +1429,7 @@ export interface TuningConfig { offsetFetchPeriod?: string; maxParseExceptions?: number; maxSavedParseExceptions?: number; - recordBufferSize?: number; + recordBufferSizeBytes?: number; recordBufferOfferTimeout?: number; recordBufferFullWait?: number; fetchThreads?: number; @@ -2055,13 +2041,13 @@ const TUNING_FORM_FIELDS: Field[] = [ ), }, { - name: 'spec.tuningConfig.recordBufferSize', + name: 'spec.tuningConfig.recordBufferSizeBytes', type: 'number', - defaultValue: 10000, + defaultValue: 100000000, defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), info: ( <> - Size of the buffer (number of events) used between the Kinesis fetch threads and the main + Size of the buffer (heap memory bytes) used between the Kinesis fetch threads and the main ingestion thread. ), @@ -2106,15 +2092,15 @@ const TUNING_FORM_FIELDS: Field[] = [ ), }, { - name: 'spec.tuningConfig.maxRecordsPerPoll', + name: 'spec.tuningConfig.maxBytesPerPoll', type: 'number', - defaultValue: 100, + defaultValue: 1000000, defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), hideInMore: true, info: ( <> - The maximum number of records/events to be fetched from buffer per poll. The actual maximum - will be max(maxRecordsPerPoll, max(bufferSize, 1)). + The maximum number of bytes to be fetched from buffer per poll. At least one record will be + fetched regardless of config. ), }, diff --git a/website/.spelling b/website/.spelling index 175774e4ac24..503db8fdf319 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1127,6 +1127,7 @@ LZ4LZFuncompressedLZ4LZ4LZFuncompressednoneLZ4autolongsautolongslongstypeconcise deaggregate druid-kinesis-indexing-service maxRecordsPerPoll +maxBytesPerPoll maxRecordsPerPollrecordsPerFetchfetchDelayMillisreplicasfetchDelayMillisrecordsPerFetchfetchDelayMillismaxRecordsPerPollamazon-kinesis-client1 numKinesisShards numProcessors