Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|

### Partitioning specification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0;
private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0;

public static HadoopTuningConfig makeDefaultTuningConfig()
{
Expand All @@ -64,7 +64,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
false,
null,
DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_PERSIST_BACKGROUND_COUNT
DEFAULT_NUM_BACKGROUND_PERSIST_THREADS
);
}

Expand All @@ -82,7 +82,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;
private final int persistBackgroundCount;
private final int numBackgroundPersistThreads;

@JsonCreator
public HadoopTuningConfig(
Expand All @@ -102,7 +102,7 @@ public HadoopTuningConfig(
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount
final @JsonProperty("numBackgroundPersistThreads") Integer numBackgroundPersistThreads
)
{
this.workingPath = workingPath;
Expand All @@ -121,8 +121,8 @@ public HadoopTuningConfig(
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount;
Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0");
this.numBackgroundPersistThreads = numBackgroundPersistThreads == null ? DEFAULT_NUM_BACKGROUND_PERSIST_THREADS : numBackgroundPersistThreads;
Preconditions.checkArgument(this.numBackgroundPersistThreads >= 0, "Not support persistBackgroundCount < 0");
}

@JsonProperty
Expand Down Expand Up @@ -209,9 +209,9 @@ public Boolean getBuildV9Directly() {
}

@JsonProperty
public int getPersistBackgroundCount()
public int getNumBackgroundPersistThreads()
{
return persistBackgroundCount;
return numBackgroundPersistThreads;
}

public HadoopTuningConfig withWorkingPath(String path)
Expand All @@ -232,7 +232,7 @@ public HadoopTuningConfig withWorkingPath(String path)
useCombiner,
null,
buildV9Directly,
persistBackgroundCount
numBackgroundPersistThreads
);
}

Expand All @@ -254,7 +254,7 @@ public HadoopTuningConfig withVersion(String ver)
useCombiner,
null,
buildV9Directly,
persistBackgroundCount
numBackgroundPersistThreads
);
}

Expand All @@ -276,7 +276,7 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
useCombiner,
null,
buildV9Directly,
persistBackgroundCount
numBackgroundPersistThreads
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,12 @@ protected void reduce(

Set<String> allDimensionNames = Sets.newLinkedHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount();
if (persistBackgroundCount > 0) {
int numBackgroundPersistThreads = config.getSchema().getTuningConfig().getNumBackgroundPersistThreads();
if (numBackgroundPersistThreads > 0) {
final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
persistBackgroundCount,
persistBackgroundCount,
numBackgroundPersistThreads,
numBackgroundPersistThreads,
0L,
TimeUnit.MILLISECONDS,
queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testSerde() throws Exception
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.getUseCombiner());
Assert.assertEquals(0, actual.getPersistBackgroundCount());
Assert.assertEquals(0, actual.getNumBackgroundPersistThreads());

}

Expand Down