From b0bb6b7921613523db5868026b99cc04e7e8c5dd Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 21 Sep 2022 10:45:23 -0700 Subject: [PATCH 1/4] Change default handoffConditionTimeout to 15 minutes. Most of the time, when handoff is taking this long, it's because something is preventing Historicals from loading new data. In this case, we have two choices: 1) Stop making progress on ingestion, wait for Historicals to load stuff, and keep the waiting-for-handoff segments available on realtime tasks. (handoffConditionTimeout = 0, the current default) 2) Continue making progress on ingestion, by exiting the realtime tasks that were waiting for handoff. Once the Historicals get their act together, the segments will be loaded, as they are still there on deep storage. They will just not be continuously available. (handoffConditionTimeout > 0) I believe most users would prefer [2], because [1] risks ingestion falling behind the stream, which causes many other problems. It can cause data loss if the stream ages-out data before we have a chance to ingest it. Due to the way tuningConfigs are serialized -- defaults are baked into the serialized form that is written to the database -- this default change will not change anyone's existing supervisors. It will take effect for newly created supervisors. --- docs/development/extensions-core/kafka-supervisor-reference.md | 2 +- docs/development/extensions-core/kinesis-ingestion.md | 2 +- .../apache/druid/segment/indexing/RealtimeTuningConfig.java | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index eff12d918508..0a705e872ead 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no | | `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) | | `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) | -| `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 0) | +| `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 900000 [15 minutes]) | | `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.

This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) | | `workerThreads` | Integer | The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation. | no (default == min(10, taskCount)) | | `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) | diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 52abcba4b324..82b0a23671cc 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| -|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 0)| +|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 900000 [15 minutes])| |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it highlights issues with ingestion.

If true, Druid automatically resets to the earliest or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Note that this can lead to data being *DROPPED* (if `useEarliestSequenceNumber` is false) or *DUPLICATED* (if `useEarliestSequenceNumber` is true) without your knowledge. Druid will log messages indicating that a reset has occurred without interrupting ingestion. This mode is useful for non-production situations since it enables Druid to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.|no (default == false)| |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))| diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index b4cda686816d..32c95ebc6add 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.time.Duration; /** * @@ -51,7 +52,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1); private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT; private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE; - private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0; + private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = Duration.ofMinutes(15).toMillis(); private static final long DEFAULT_ALERT_TIMEOUT = 0; private static final String DEFAULT_DEDUP_COLUMN = null; From 10cf2a0b91efa027350d2b7d042417924a5386cc Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 12 Jul 2023 09:32:02 -0700 Subject: [PATCH 2/4] Fix tests. --- .../indexing/kafka/KafkaIndexTaskTuningConfigTest.java | 6 +++--- .../kafka/supervisor/KafkaSupervisorTuningConfigTest.java | 5 ++--- .../indexing/kinesis/KinesisIndexTaskTuningConfigTest.java | 6 +++--- .../supervisor/KinesisSupervisorTuningConfigTest.java | 5 ++--- .../druid/segment/indexing/RealtimeTuningConfigTest.java | 3 ++- 5 files changed, 12 insertions(+), 13 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index f0b489ec5972..1b8b22e2c6ee 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kafka; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; @@ -35,6 +34,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; public class KafkaIndexTaskTuningConfigTest { @@ -43,7 +43,7 @@ public class KafkaIndexTaskTuningConfigTest public KafkaIndexTaskTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } @Test @@ -71,7 +71,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index d9d572220b19..e73315fbee0a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kafka.supervisor; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; @@ -39,7 +38,7 @@ public class KafkaSupervisorTuningConfigTest public KafkaSupervisorTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } @Test @@ -66,7 +65,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); Assert.assertNull(config.getChatThreads()); Assert.assertEquals(8L, (long) config.getChatRetries()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index ed497a64cfac..6136a8994219 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; @@ -39,6 +38,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; public class KinesisIndexTaskTuningConfigTest { @@ -47,7 +47,7 @@ public class KinesisIndexTaskTuningConfigTest public KinesisIndexTaskTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KinesisIndexingServiceModule().getJacksonModules()); + mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules()); } @Rule @@ -76,7 +76,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertFalse(config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getRecordBufferSizeConfigured()); Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); Assert.assertEquals(5000, config.getRecordBufferOfferTimeout()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index 320a978f1251..140c7094f049 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kinesis.supervisor; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.jackson.DefaultObjectMapper; @@ -38,7 +37,7 @@ public class KinesisSupervisorTuningConfigTest public KinesisSupervisorTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KinesisIndexingServiceModule().getJacksonModules()); + mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules()); } @Test @@ -64,7 +63,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); Assert.assertNull(config.getChatThreads()); Assert.assertEquals(8L, (long) config.getChatRetries()); diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index 535dd108fdf1..abc95719707f 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.hamcrest.CoreMatchers; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -82,7 +83,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.standardMinutes(15).getMillis(), config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); From ed53b8fcbd41dabf88382b1589bac705c4437088 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 13 Jul 2023 07:58:21 -0700 Subject: [PATCH 3/4] Update docs/development/extensions-core/kafka-supervisor-reference.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --- docs/development/extensions-core/kafka-supervisor-reference.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index 0a705e872ead..ebde49e3da26 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no | | `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) | | `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) | -| `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 900000 [15 minutes]) | +| `handoffConditionTimeout` | Long | Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely. | no (default == 900000 [15 minutes]) | | `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.

This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) | | `workerThreads` | Integer | The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation. | no (default == min(10, taskCount)) | | `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) | From 04efc5ba788b9a6541fd787490dd22c2849717e8 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 13 Jul 2023 07:58:27 -0700 Subject: [PATCH 4/4] Update docs/development/extensions-core/kinesis-ingestion.md Co-authored-by: Katya Macedo <38017980+ektravel@users.noreply.github.com> --- docs/development/extensions-core/kinesis-ingestion.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 82b0a23671cc..1b921106885e 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| -|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 900000 [15 minutes])| +|`handoffConditionTimeout`|Long| Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.| no (default == 900000 [15 minutes])| |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it highlights issues with ingestion.

If true, Druid automatically resets to the earliest or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Note that this can lead to data being *DROPPED* (if `useEarliestSequenceNumber` is false) or *DUPLICATED* (if `useEarliestSequenceNumber` is true) without your knowledge. Druid will log messages indicating that a reset has occurred without interrupting ingestion. This mode is useful for non-production situations since it enables Druid to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.|no (default == false)| |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))|