Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isUseMaxMemoryEstimates(),
tuningConfig.isLeaveIntermediate(),
tuningConfig.isCleanupOnFailure(),
tuningConfig.isOverwriteFiles(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.partitions.DimensionBasedPartitionsSpec;
import org.apache.druid.indexer.partitions.HashedPartitionsSpec;
import org.apache.druid.java.util.common.DateTimes;
Expand All @@ -33,6 +33,7 @@
import org.apache.druid.segment.indexing.TuningConfig;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -62,6 +63,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
DEFAULT_MAX_ROWS_IN_MEMORY_BATCH,
0L,
false,
false,
true,
false,
false,
Expand Down Expand Up @@ -90,6 +92,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean useMaxMemoryEstimates;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
private final boolean overwriteFiles;
Expand Down Expand Up @@ -126,6 +129,7 @@ public HadoopTuningConfig(
final @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
final @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
final @JsonProperty("useMaxMemoryEstimates") @Nullable Boolean useMaxMemoryEstimates,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") @Nullable Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
Expand All @@ -147,48 +151,47 @@ public HadoopTuningConfig(
)
{
this.workingPath = workingPath;
this.version = version == null ? DateTimes.nowUtc().toString() : version;
this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec;
this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.maxRowsInMemory = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_MAX_ROWS_IN_MEMORY_BATCH
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.version = Configs.valueOrDefault(version, DateTimes.nowUtc().toString());
this.partitionsSpec = Configs.valueOrDefault(partitionsSpec, DEFAULT_PARTITIONS_SPEC);
this.shardSpecs = Configs.valueOrDefault(shardSpecs, DEFAULT_SHARD_SPECS);
this.indexSpec = Configs.valueOrDefault(indexSpec, DEFAULT_INDEX_SPEC);
this.indexSpecForIntermediatePersists = Configs.valueOrDefault(
indexSpecForIntermediatePersists,
this.indexSpec
);
this.maxRowsInMemory = Configs.valueOrDefault(
maxRowsInMemory,
Configs.valueOrDefault(maxRowsInMemoryCOMPAT, DEFAULT_MAX_ROWS_IN_MEMORY_BATCH)
);
this.useMaxMemoryEstimates = Configs.valueOrDefault(useMaxMemoryEstimates, false);
this.appendableIndexSpec = Configs.valueOrDefault(appendableIndexSpec, DEFAULT_APPENDABLE_INDEX);
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxBytesInMemory = Configs.valueOrDefault(maxBytesInMemory, 0);
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.cleanupOnFailure = Configs.valueOrDefault(cleanupOnFailure, true);
this.overwriteFiles = overwriteFiles;
this.jobProperties = (jobProperties == null
? ImmutableMap.of()
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner;
this.numBackgroundPersistThreads = numBackgroundPersistThreads == null
? DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
: numBackgroundPersistThreads;
this.useCombiner = Configs.valueOrDefault(useCombiner, DEFAULT_USE_COMBINER);
this.numBackgroundPersistThreads = Configs.valueOrDefault(
numBackgroundPersistThreads,
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
);
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
this.useExplicitVersion = useExplicitVersion;
this.allowedHadoopPrefix = allowedHadoopPrefix == null ? ImmutableList.of() : allowedHadoopPrefix;

this.ignoreInvalidRows = ignoreInvalidRows == null ? false : ignoreInvalidRows;
if (maxParseExceptions != null) {
this.maxParseExceptions = maxParseExceptions;
} else {
if (!this.ignoreInvalidRows) {
this.maxParseExceptions = 0;
} else {
this.maxParseExceptions = TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS;
}
}
this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions;
this.allowedHadoopPrefix = Configs.valueOrDefault(allowedHadoopPrefix, Collections.emptyList());

this.useYarnRMJobStatusFallback = useYarnRMJobStatusFallback == null ? true : useYarnRMJobStatusFallback;
this.ignoreInvalidRows = Configs.valueOrDefault(ignoreInvalidRows, false);
this.maxParseExceptions = Configs.valueOrDefault(
maxParseExceptions,
this.ignoreInvalidRows ? TuningConfig.DEFAULT_MAX_PARSE_EXCEPTIONS : 0
);
this.logParseExceptions = Configs.valueOrDefault(logParseExceptions, TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS);
this.useYarnRMJobStatusFallback = Configs.valueOrDefault(useYarnRMJobStatusFallback, true);

if (awaitSegmentAvailabilityTimeoutMillis == null || awaitSegmentAvailabilityTimeoutMillis < 0) {
this.awaitSegmentAvailabilityTimeoutMillis = DEFAULT_AWAIT_SEGMENT_AVAILABILITY_TIMEOUT_MILLIS;
Expand Down Expand Up @@ -263,6 +266,12 @@ public long getMaxBytesInMemory()
return maxBytesInMemory;
}

@JsonProperty
public boolean isUseMaxMemoryEstimates()
{
return useMaxMemoryEstimates;
}

@JsonProperty
public boolean isLeaveIntermediate()
{
Expand Down Expand Up @@ -372,6 +381,7 @@ public HadoopTuningConfig withWorkingPath(String path)
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
Expand Down Expand Up @@ -404,6 +414,7 @@ public HadoopTuningConfig withVersion(String ver)
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
Expand Down Expand Up @@ -436,6 +447,7 @@ public HadoopTuningConfig withShardSpecs(Map<Long, List<HadoopyShardSpec>> specs
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
useMaxMemoryEstimates,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ private static IncrementalIndex makeIncrementalIndex(
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getMaxRowsInMemory())
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
.setUseMaxMemoryEstimates(tuningConfig.isUseMaxMemoryEstimates())
.build();

if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(
false,
false,
false,
false,
null,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ public DetermineHashedPartitionsJobTest(
false,
false,
false,
false,
null,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ public DeterminePartitionsJobTest(
false,
false,
false,
false,
null,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ public DetermineRangePartitionsJobTest(
null,
null,
null,
false,
false, false,
false,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ HadoopIngestionSpec build()
false,
false,
false,
false,
null,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void testSerde() throws Exception
null,
100,
null,
false,
true,
true,
true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,7 @@ public void setUp() throws Exception
null,
maxRowsInMemory,
maxBytesInMemory,
false,
true,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public void setup() throws Exception
false,
false,
false,
false,
//Map of job properties
ImmutableMap.of(
"fs.s3.impl",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class GranularityPathSpecTest
false,
false,
false,
false,
null,
false,
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class AppendableIndexBuilder
// DruidInputSource since that is the only case where we can have existing metrics.
// This is currently only use by auto compaction and should not be use for anything else.
protected boolean preserveExistingMetrics = false;
protected boolean useMaxMemoryEstimates = true;
protected boolean useMaxMemoryEstimates = false;

protected final Logger log = new Logger(this.getClass());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import org.apache.druid.guice.annotations.UnstableApi;

/**
* AppendableIndexSpec describes the in-memory indexing method for data ingestion.
* Describes the in-memory indexing method for data ingestion.
*/
@UnstableApi
public interface AppendableIndexSpec
{
// Returns a builder of the appendable index.
/**
* Creates a new builder of the appendable index.
*/
AppendableIndexBuilder builder();

// Returns the default max bytes in memory for this index.
/**
* Returns the default max bytes in memory for this index.
*/
long getDefaultMaxBytesInMemory();
}