diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index eff12d918508..ebde49e3da26 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no | | `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) | | `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) | -| `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 0) | +| `handoffConditionTimeout` | Long | Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely. | no (default == 900000 [15 minutes]) | | `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).

If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it will make you aware of issues with ingestion.

If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.

This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) | | `workerThreads` | Integer | The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation. | no (default == min(10, taskCount)) | | `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) | diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 52abcba4b324..1b921106885e 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| -|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 0)| +|`handoffConditionTimeout`|Long| Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.| no (default == 900000 [15 minutes])| |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read Kinesis messages that are no longer available.

If false, the exception bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it highlights issues with ingestion.

If true, Druid automatically resets to the earliest or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Note that this can lead to data being *DROPPED* (if `useEarliestSequenceNumber` is false) or *DUPLICATED* (if `useEarliestSequenceNumber` is true) without your knowledge. Druid will log messages indicating that a reset has occurred without interrupting ingestion. This mode is useful for non-production situations since it enables Druid to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.|no (default == false)| |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)| |`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))| diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index f0b489ec5972..1b8b22e2c6ee 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kafka; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig; @@ -35,6 +34,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; public class KafkaIndexTaskTuningConfigTest { @@ -43,7 +43,7 @@ public class KafkaIndexTaskTuningConfigTest public KafkaIndexTaskTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } @Test @@ -71,7 +71,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index d9d572220b19..e73315fbee0a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kafka.supervisor; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; @@ -39,7 +38,7 @@ public class KafkaSupervisorTuningConfigTest public KafkaSupervisorTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KafkaIndexTaskModule().getJacksonModules()); + mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules()); } @Test @@ -66,7 +65,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); Assert.assertNull(config.getChatThreads()); Assert.assertEquals(8L, (long) config.getChatRetries()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index ed497a64cfac..6136a8994219 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -20,7 +20,6 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.databind.JsonMappingException; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig; @@ -39,6 +38,7 @@ import java.io.File; import java.io.IOException; +import java.time.Duration; public class KinesisIndexTaskTuningConfigTest { @@ -47,7 +47,7 @@ public class KinesisIndexTaskTuningConfigTest public KinesisIndexTaskTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KinesisIndexingServiceModule().getJacksonModules()); + mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules()); } @Rule @@ -76,7 +76,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertFalse(config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getRecordBufferSizeConfigured()); Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); Assert.assertEquals(5000, config.getRecordBufferOfferTimeout()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java index 320a978f1251..140c7094f049 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java @@ -19,7 +19,6 @@ package org.apache.druid.indexing.kinesis.supervisor; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.jackson.DefaultObjectMapper; @@ -38,7 +37,7 @@ public class KinesisSupervisorTuningConfigTest public KinesisSupervisorTuningConfigTest() { mapper = new DefaultObjectMapper(); - mapper.registerModules((Iterable) new KinesisIndexingServiceModule().getJacksonModules()); + mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules()); } @Test @@ -64,7 +63,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(false, config.isReportParseExceptions()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); Assert.assertNull(config.getChatThreads()); Assert.assertEquals(8L, (long) config.getChatRetries()); diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index b4cda686816d..32c95ebc6add 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import java.io.File; +import java.time.Duration; /** * @@ -51,7 +52,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1); private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT; private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE; - private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0; + private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = Duration.ofMinutes(15).toMillis(); private static final long DEFAULT_ALERT_TIMEOUT = 0; private static final String DEFAULT_DEDUP_COLUMN = null; diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index 535dd108fdf1..abc95719707f 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.incremental.OnheapIncrementalIndex; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.hamcrest.CoreMatchers; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; @@ -82,7 +83,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec()); - Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(Duration.standardMinutes(15).getMillis(), config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec()); Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists());