Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning
| `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no |
| `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) |
| `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) |
| `handoffConditionTimeout` | Long | Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. | no (default == 0) |
| `handoffConditionTimeout` | Long | Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely. | no (default == 900000 [15 minutes]) |
| `resetOffsetAutomatically` | Boolean | Controls behavior when Druid needs to read Kafka messages that are no longer available (i.e. when `OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception will bubble up, which will cause your tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation; potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it will make you aware of issues with ingestion.<br/><br/>If true, Druid will automatically reset to the earlier or latest offset available in Kafka, based on the value of the `useEarliestOffset` property (earliest if true, latest if false). Note that this can lead to data being _DROPPED_ (if `useEarliestOffset` is false) or _DUPLICATED_ (if `useEarliestOffset` is true) without your knowledge. Messages will be logged indicating that a reset has occurred, but ingestion will continue. This mode is useful for non-production situations, since it will make Druid attempt to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.<br/><br/>This feature behaves similarly to the Kafka `auto.offset.reset` consumer property. | no (default == false) |
| `workerThreads` | Integer | The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation. | no (default == min(10, taskCount)) |
| `chatAsync` | Boolean | If true, use asynchronous communication with indexing tasks, and ignore the `chatThreads` parameter. If false, use synchronous communication in a thread pool of size `chatThreads`. | no (default == true) |
Expand Down
2 changes: 1 addition & 1 deletion docs/development/extensions-core/kinesis-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no|
|`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.| no (default = same as `indexSpec`)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.| no (default == 0)|
|`handoffConditionTimeout`|Long| Number of milliseconds to wait for segment handoff. Set to a value >= 0, where 0 means to wait indefinitely.| no (default == 900000 [15 minutes])|
|`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read Kinesis messages that are no longer available.<br/><br/>If false, the exception bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual intervention is required to correct the situation, potentially using the [Reset Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for production, since it highlights issues with ingestion.<br/><br/>If true, Druid automatically resets to the earliest or latest sequence number available in Kinesis, based on the value of the `useEarliestSequenceNumber` property (earliest if true, latest if false). Note that this can lead to data being *DROPPED* (if `useEarliestSequenceNumber` is false) or *DUPLICATED* (if `useEarliestSequenceNumber` is true) without your knowledge. Druid will log messages indicating that a reset has occurred without interrupting ingestion. This mode is useful for non-production situations since it enables Druid to recover from problems automatically, even if they lead to quiet dropping or duplicating of data.|no (default == false)|
|`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if the current sequence number is still available in a particular Kinesis shard. If set to false, the indexing task will attempt to reset the current sequence number (or not), depending on the value of `resetOffsetAutomatically`.|no (default == false)|
|`workerThreads`|Integer|The number of threads that the supervisor uses to handle requests/responses for worker tasks, along with any other internal asynchronous operation.|no (default == min(10, taskCount))|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.indexing.kafka;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
Expand All @@ -35,6 +34,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;

public class KafkaIndexTaskTuningConfigTest
{
Expand All @@ -43,7 +43,7 @@ public class KafkaIndexTaskTuningConfigTest
public KafkaIndexTaskTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}

@Test
Expand Down Expand Up @@ -71,7 +71,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
Expand All @@ -39,7 +38,7 @@ public class KafkaSupervisorTuningConfigTest
public KafkaSupervisorTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KafkaIndexTaskModule().getJacksonModules());
mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}

@Test
Expand All @@ -66,7 +65,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertNull(config.getChatThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.druid.indexing.kinesis;

import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
Expand All @@ -39,6 +38,7 @@

import java.io.File;
import java.io.IOException;
import java.time.Duration;

public class KinesisIndexTaskTuningConfigTest
{
Expand All @@ -47,7 +47,7 @@ public class KinesisIndexTaskTuningConfigTest
public KinesisIndexTaskTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KinesisIndexingServiceModule().getJacksonModules());
mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules());
}

@Rule
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertFalse(config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertNull(config.getRecordBufferSizeConfigured());
Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false));
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.indexing.kinesis.supervisor;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.jackson.DefaultObjectMapper;
Expand All @@ -38,7 +37,7 @@ public class KinesisSupervisorTuningConfigTest
public KinesisSupervisorTuningConfigTest()
{
mapper = new DefaultObjectMapper();
mapper.registerModules((Iterable<Module>) new KinesisIndexingServiceModule().getJacksonModules());
mapper.registerModules(new KinesisIndexingServiceModule().getJacksonModules());
}

@Test
Expand All @@ -64,7 +63,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertNull(config.getChatThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import javax.annotation.Nullable;
import java.io.File;
import java.time.Duration;

/**
*
Expand All @@ -51,7 +52,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 1);
private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = Duration.ofMinutes(15).toMillis();
private static final long DEFAULT_ALERT_TIMEOUT = 0;
private static final String DEFAULT_DEDUP_COLUMN = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -82,7 +83,7 @@ public void testSerdeWithDefaults() throws Exception
);

Assert.assertEquals(new OnheapIncrementalIndex.Spec(), config.getAppendableIndexSpec());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(Duration.standardMinutes(15).getMillis(), config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists());
Expand Down