Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ public void mergeV9(Blackhole blackhole) throws Exception
schemaInfo.getAggsArray(),
tmpFile,
new IndexSpec(),
null
null,
-1
);

blackhole.consume(mergedFile);
Expand Down
1 change: 1 addition & 0 deletions docs/ingestion/native-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxColumnsToMerge|A parameter that limits how many segments can be merged in a single phase when merging segments for publishing. This limit is imposed on the total number of columns present in a set of segments being merged. If the limit is exceeded, segment merging will occur in multiple phases. At least 2 segments will be merged in a single phase, regardless of this setting.|-1 (unlimited)|no|
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ protected File mergeQueryableIndex(
{
boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup();
return HadoopDruidIndexerConfig.INDEX_MERGER_V9
.mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null);
.mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null, -1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig)
null,
indexTuningConfig.isLogParseExceptions(),
indexTuningConfig.getMaxParseExceptions(),
indexTuningConfig.getMaxSavedParseExceptions()
indexTuningConfig.getMaxSavedParseExceptions(),
indexTuningConfig.getMaxColumnsToMerge()
);
} else {
throw new ISE(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.segment.IndexMerger;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.ParseExceptionHandler;
Expand Down Expand Up @@ -1122,6 +1123,7 @@ public static class IndexTuningConfig implements AppenderatorConfig
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final int maxColumnsToMerge;

// null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details.
@Nullable
Expand Down Expand Up @@ -1210,7 +1212,8 @@ public IndexTuningConfig(
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
this(
Expand All @@ -1235,7 +1238,8 @@ public IndexTuningConfig(
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
maxColumnsToMerge
);

Preconditions.checkArgument(
Expand All @@ -1246,7 +1250,7 @@ public IndexTuningConfig(

private IndexTuningConfig()
{
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}

private IndexTuningConfig(
Expand All @@ -1264,14 +1268,18 @@ private IndexTuningConfig(
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer maxColumnsToMerge
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see #getMaxBytesInMemoryOrDefault()
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxColumnsToMerge = maxColumnsToMerge == null
? IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE
: maxColumnsToMerge;
this.partitionsSpec = partitionsSpec;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
Expand Down Expand Up @@ -1320,7 +1328,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir)
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
maxColumnsToMerge
);
}

Expand All @@ -1341,7 +1350,8 @@ public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec)
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
maxColumnsToMerge
);
}

Expand Down Expand Up @@ -1442,6 +1452,13 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
return segmentWriteOutMediumFactory;
}

@Override
@JsonProperty
public int getMaxColumnsToMerge()
{
return maxColumnsToMerge;
}

@JsonProperty
public boolean isLogParseExceptions()
{
Expand Down Expand Up @@ -1532,6 +1549,7 @@ public boolean equals(Object o)
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
maxRowsInMemory == that.maxRowsInMemory &&
maxBytesInMemory == that.maxBytesInMemory &&
maxColumnsToMerge == that.maxColumnsToMerge &&
maxPendingPersists == that.maxPendingPersists &&
forceGuaranteedRollup == that.forceGuaranteedRollup &&
reportParseExceptions == that.reportParseExceptions &&
Expand All @@ -1553,6 +1571,7 @@ public int hashCode()
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
maxColumnsToMerge,
partitionsSpec,
indexSpec,
indexSpecForIntermediatePersists,
Expand All @@ -1574,6 +1593,7 @@ public String toString()
return "IndexTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxBytesInMemory=" + maxBytesInMemory +
", maxColumnsToMerge=" + maxColumnsToMerge +
", partitionsSpec=" + partitionsSpec +
", indexSpec=" + indexSpec +
", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,8 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC
tuningConfig.getSegmentWriteOutMediumFactory(),
tuningConfig.isLogParseExceptions(),
tuningConfig.getMaxParseExceptions(),
tuningConfig.getMaxSavedParseExceptions()
tuningConfig.getMaxSavedParseExceptions(),
tuningConfig.getMaxColumnsToMerge()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static ParallelIndexTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -131,7 +132,8 @@ public ParallelIndexTuningConfig(
@JsonProperty("totalNumMergeTasks") @Nullable Integer totalNumMergeTasks,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
super(
Expand All @@ -155,7 +157,8 @@ public ParallelIndexTuningConfig(
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
maxColumnsToMerge
);

if (maxNumSubTasks != null && maxNumConcurrentSubTasks != null) {
Expand Down Expand Up @@ -276,7 +279,8 @@ public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpe
getTotalNumMergeTasks(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions()
getMaxSavedParseExceptions(),
getMaxColumnsToMerge()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,8 @@ private static Pair<File, List<String>> mergeSegmentsInSamePartition(
dataSchema.getAggregators(),
outDir,
tuningConfig.getIndexSpec(),
tuningConfig.getSegmentWriteOutMediumFactory()
tuningConfig.getSegmentWriteOutMediumFactory(),
tuningConfig.getMaxColumnsToMerge()
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException
100,
null,
null,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ public void testRunWithHashPartitioning() throws Exception
null,
null,
null,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ private static ParallelIndexTuningConfig createTuningConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -462,6 +463,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws
null,
null,
null,
null,
null
),
null,
Expand Down Expand Up @@ -614,6 +616,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
Expand Down Expand Up @@ -682,6 +685,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
Expand Down Expand Up @@ -750,6 +754,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment
null,
null,
null,
null,
null
);
final List<ParallelIndexIngestionSpec> ingestionSpecs = CompactionTask.createIngestionSchema(
Expand Down Expand Up @@ -1139,6 +1144,7 @@ private void assertIngestionSchema(
null,
null,
null,
null,
null
),
expectedSegmentGranularity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
1234
);
assertSerdeTuningConfig(tuningConfig);
}
Expand Down Expand Up @@ -116,7 +117,8 @@ public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
null
);
assertSerdeTuningConfig(tuningConfig);
}
Expand Down Expand Up @@ -150,7 +152,8 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
null
);
assertSerdeTuningConfig(tuningConfig);
}
Expand Down Expand Up @@ -184,7 +187,8 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
1234
);
assertSerdeTuningConfig(tuningConfig);
}
Expand Down Expand Up @@ -220,7 +224,8 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec()
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
null
);
}

Expand Down Expand Up @@ -255,7 +260,8 @@ public void testBestEffortRollupWithHashedPartitionsSpec()
OffHeapMemorySegmentWriteOutMediumFactory.instance(),
true,
10,
100
100,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception
null,
true,
7,
7
7,
null
);

final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
Expand Down Expand Up @@ -1264,7 +1265,8 @@ public void testMultipleParseExceptionsFailure() throws Exception
null,
true,
2,
5
5,
null
);

final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
Expand Down Expand Up @@ -1385,7 +1387,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc
null,
true,
2,
5
5,
null
);

final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null);
Expand Down Expand Up @@ -1830,7 +1833,8 @@ static IndexTuningConfig createTuningConfig(
null,
null,
null,
1
1,
null
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void testIndexTaskSerde() throws Exception
null,
null,
null,
null,
null
)
),
Expand Down Expand Up @@ -348,6 +349,7 @@ public void testIndexTaskwithResourceSerde() throws Exception
null,
null,
null,
null,
null
)
),
Expand Down
Loading