Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ Additional peon configs include:
|`druid.peon.mode`|Choices are "local" and "remote". Setting this to local means you intend to run the peon as a standalone process (Not recommended).|remote|
|`druid.indexer.task.baseDir`|Base temporary working directory.|`System.getProperty("java.io.tmpdir")`|
|`druid.indexer.task.baseTaskDir`|Base temporary working directory for tasks.|`${druid.indexer.task.baseDir}/persistent/task`|
|`druid.indexer.task.useLegacyBatchProcessing`|If false, native batch ingestion will use a new, recommended, code path with memory optimized code for the segment creation phase. If true it will use the previous code path for the create segments phase of batch ingestion. This does not apply to streaming ingestion, just to batch. This setting should only be used when a bug is suspected or found in the new optimized batch ingestion code. If a bug is suspected or found, you can set this flag to `true` to fall back to previous, working but more memory intensive, code path.|`false`|
|`druid.indexer.task.batchProcessingMode`| Batch ingestion tasks have three operating modes that control how intermediary segments are constructed and tracked: `OPEN_SEGMENTS`, `CLOSED_SEGMENTS`, and `CLOSED_SEGMENT_SINKS`. `OPEN_SEGMENTS` will use code based on the original batch ingestion path and performs a `mmap` on intermediary segments to build a timeline so that these segments can be queryable by realtime queries. This is not needed at all for batch, so the default mode, `CLOSED_SEGMENTS`, eliminates `mmap` of intermediary segments, but still tracks the entire set of segments in heap. The `CLOSED_SEGMENTS_SINKS` mode is the most aggressive and should have the smallest memory footprint, and works by eliminating in memory tracking and `mmap` of intermediary segments produced during segment creation. This mode isn't as well tested as other modes so is currently considered experimental. `OPEN_SEGMENTS` mode can be selected if any problems occur with the 2 newer modes. |`CLOSED_SEGMENTS`|
|`druid.indexer.task.defaultHadoopCoordinates`|Hadoop version to use with HadoopIndexTasks that do not request a particular version.|org.apache.hadoop:hadoop-client:2.8.5|
|`druid.indexer.task.defaultRowFlushBoundary`|Highest row count before persisting to disk. Used for indexing generating tasks.|75000|
|`druid.indexer.task.directoryLockTimeout`|Wait this long for zombie peons to exit before giving up on their replacements.|PT10M|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2783,7 +2783,8 @@ private void makeToolboxFactory() throws IOException
null,
null,
false,
false
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2869,7 +2869,8 @@ private void makeToolboxFactory() throws IOException
null,
null,
false,
false
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
);
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createDataSourceTable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.EnumUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.joda.time.Period;

Expand All @@ -39,10 +41,22 @@
*/
public class TaskConfig
{
private static final Logger log = new Logger(TaskConfig.class);

public static final List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
"org.apache.hadoop:hadoop-client:2.8.5"
);

// This enum controls processing mode of batch ingestion "segment creation" phase (i.e. appenderator logic)
public enum BatchProcessingMode
{
OPEN_SEGMENTS, /* mmap segments, legacy code */
CLOSED_SEGMENTS, /* Do not mmap segments but keep most other legacy code */
CLOSED_SEGMENTS_SINKS /* Most aggressive memory optimization, do not mmap segments and eliminate sinks, etc. */
}

public static final BatchProcessingMode BATCH_PROCESSING_MODE_DEFAULT = BatchProcessingMode.CLOSED_SEGMENTS;

private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");
private static final Period DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT = new Period("PT5M");

Expand Down Expand Up @@ -77,7 +91,10 @@ public class TaskConfig
private final boolean ignoreTimestampSpecForDruidInputSource;

@JsonProperty
private final boolean useLegacyBatchProcessing;
private final boolean batchMemoryMappedIndex;

@JsonProperty
private final BatchProcessingMode batchProcessingMode;

@JsonCreator
public TaskConfig(
Expand All @@ -91,7 +108,8 @@ public TaskConfig(
@JsonProperty("directoryLockTimeout") Period directoryLockTimeout,
@JsonProperty("shuffleDataLocations") List<StorageLocationConfig> shuffleDataLocations,
@JsonProperty("ignoreTimestampSpecForDruidInputSource") boolean ignoreTimestampSpecForDruidInputSource,
@JsonProperty("useLegacyBatchProcessing") boolean useLegacyBatchProcessing // only set to true to fall back to older behavior
@JsonProperty("batchMemoryMappedIndex") boolean batchMemoryMappedIndex, // deprecated, only set to true to fall back to older behavior
@JsonProperty("batchProcessingMode") String batchProcessingMode
)
{
this.baseDir = baseDir == null ? System.getProperty("java.io.tmpdir") : baseDir;
Expand All @@ -117,7 +135,23 @@ public TaskConfig(
this.shuffleDataLocations = shuffleDataLocations;
}
this.ignoreTimestampSpecForDruidInputSource = ignoreTimestampSpecForDruidInputSource;
this.useLegacyBatchProcessing = useLegacyBatchProcessing;

this.batchMemoryMappedIndex = batchMemoryMappedIndex;
// Conflict resolution. Assume that if batchMemoryMappedIndex is set (since false is the default) that
// the user changed it intentionally to use legacy, in this case oveeride batchProcessingMode and also
// set it to legacy else just use batchProcessingMode and don't pay attention to batchMemoryMappedIndexMode:
if (batchMemoryMappedIndex) {
this.batchProcessingMode = BatchProcessingMode.OPEN_SEGMENTS;
} else if (EnumUtils.isValidEnum(BatchProcessingMode.class, batchProcessingMode)) {
this.batchProcessingMode = BatchProcessingMode.valueOf(batchProcessingMode);
} else {
// batchProcessingMode input string is invalid, log & use the default.
this.batchProcessingMode = BatchProcessingMode.CLOSED_SEGMENTS; // Default
log.warn("Batch processing mode argument value is null or not valid:[%s], defaulting to[%s] ",
batchProcessingMode, this.batchProcessingMode
);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we log the batchProcessingMode value after this if block?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

log.info("Batch processing mode:[%s]", this.batchProcessingMode);
}

@JsonProperty
Expand Down Expand Up @@ -201,9 +235,19 @@ public boolean isIgnoreTimestampSpecForDruidInputSource()
}

@JsonProperty
public boolean getuseLegacyBatchProcessing()
public BatchProcessingMode getBatchProcessingMode()
{
return batchProcessingMode;
}

/**
* Do not use in code! use {@link TaskConfig#getBatchProcessingMode() instead}
*/
@Deprecated
@JsonProperty
public boolean getbatchMemoryMappedIndex()
{
return useLegacyBatchProcessing;
return batchMemoryMappedIndex;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
import org.apache.druid.segment.incremental.RowIngestionMeters;
import org.apache.druid.segment.indexing.DataSchema;
Expand Down Expand Up @@ -70,21 +72,51 @@ public static Appenderator newAppenderator(
ParseExceptionHandler parseExceptionHandler
)
{
return appenderatorsManager.createOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler,
toolbox.getConfig().getuseLegacyBatchProcessing()
);
if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.OPEN_SEGMENTS) {
return appenderatorsManager.createOpenSegmentsOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS) {
return appenderatorsManager.createClosedSegmentsOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
);
} else if (toolbox.getConfig().getBatchProcessingMode() == TaskConfig.BatchProcessingMode.CLOSED_SEGMENTS_SINKS) {
return appenderatorsManager.createOfflineAppenderatorForTask(
taskId,
dataSchema,
appenderatorConfig.withBasePersistDirectory(toolbox.getPersistDir()),
metrics,
segmentPusher,
toolbox.getJsonMapper(),
toolbox.getIndexIO(),
toolbox.getIndexMergerV9(),
rowIngestionMeters,
parseExceptionHandler
);
} else {
throw new IAE("Invalid batchProcesingMode[%s]", toolbox.getConfig().getBatchProcessingMode());
}
}


public static BatchAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void setUp() throws IOException
null,
null,
false,
false
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
),
new DruidNode("druid/middlemanager", "localhost", false, 8091, null, true, false),
mockTaskActionClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1518,7 +1518,8 @@ public SegmentPublishResult announceHistoricalSegments(
null,
null,
false,
false
false,
TaskConfig.BATCH_PROCESSING_MODE_DEFAULT.name()
);

final TaskActionToolbox taskActionToolbox = new TaskActionToolbox(
Expand Down
Loading