Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|
|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numBackgroundPersistThreads would be more consistent with our other properties, such as druid.processing.numThreads. We should try to keep property naming consistent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xvrl, this has been merged, file a new PR to rename the properties?


### Partitioning specification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
Expand All @@ -43,6 +44,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;
private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0;

public static HadoopTuningConfig makeDefaultTuningConfig()
{
Expand All @@ -61,7 +63,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
false,
false,
null,
DEFAULT_BUILD_V9_DIRECTLY
DEFAULT_BUILD_V9_DIRECTLY,
DEFAULT_PERSIST_BACKGROUND_COUNT
);
}

Expand All @@ -79,6 +82,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;
private final int persistBackgroundCount;

@JsonCreator
public HadoopTuningConfig(
Expand All @@ -97,7 +101,8 @@ public HadoopTuningConfig(
final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly
final @JsonProperty("buildV9Directly") Boolean buildV9Directly,
final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount
)
{
this.workingPath = workingPath;
Expand All @@ -116,6 +121,8 @@ public HadoopTuningConfig(
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount;
Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0");
}

@JsonProperty
Expand Down Expand Up @@ -201,6 +208,12 @@ public Boolean getBuildV9Directly() {
return buildV9Directly;
}

@JsonProperty
public int getPersistBackgroundCount()
{
return persistBackgroundCount;
}

public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
Expand All @@ -218,7 +231,8 @@ public HadoopTuningConfig withWorkingPath(String path)
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}

Expand All @@ -239,7 +253,8 @@ public HadoopTuningConfig withVersion(String ver)
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}

Expand All @@ -260,7 +275,8 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
combineText,
useCombiner,
null,
buildV9Directly
buildV9Directly,
persistBackgroundCount
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.common.guava.ThreadRenamingRunnable;
import io.druid.concurrent.Execs;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
Expand Down Expand Up @@ -73,6 +79,15 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
*/
Expand Down Expand Up @@ -531,6 +546,8 @@ protected void reduce(

final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();

ListeningExecutorService persistExecutor = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need ListeningExecutorService, can't see the need for listeners ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most places where new executor services are created are using listener services right now, mostly because they provide more functionality and realizing later that you should have made it a listening service is a pain. Plus handy things like Futures.allAsList only work on listenable futures. So in essence the practice of always using a listening service makes the code easier to add stuff to later at the cost of depending on an external library instead of just java.* classes. Is there any reason you know of why using listening executor services by default is a bad idea?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dint realize that we use ListeningExecutorService by default, ok then.

List<ListenableFuture<?>> persistFutures = Lists.newArrayList();
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
Expand All @@ -550,6 +567,35 @@ protected void reduce(

Set<String> allDimensionNames = Sets.newLinkedHashSet();
final ProgressIndicator progressIndicator = makeProgressIndicator(context);
int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount();
if (persistBackgroundCount > 0) {
final BlockingQueue<Runnable> queue = new SynchronousQueue<>();
ExecutorService executorService = new ThreadPoolExecutor(
persistBackgroundCount,
persistBackgroundCount,
0L,
TimeUnit.MILLISECONDS,
queue,
Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"),
new RejectedExecutionHandler()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use Execs.newBlockingSingleThreaded(..) instead ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execs.newBlockingSingleThreaded(..) only have one background thread to persist incremental Index, so i have not use it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

{
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor)
{
try {
executor.getQueue().put(r);
}
catch (InterruptedException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.currentThread.interrupt() to reset interrupted flag status?

Thread.currentThread().interrupt();
throw new RejectedExecutionException("Got Interrupted while adding to the Queue");
}
}
}
);
persistExecutor = MoreExecutors.listeningDecorator(executorService);
} else {
persistExecutor = MoreExecutors.sameThreadExecutor();
}

for (final BytesWritable bw : values) {
context.progress();
Expand All @@ -575,9 +621,29 @@ protected void reduce(
toMerge.add(file);

context.progress();
persist(index, interval, file, progressIndicator);
// close this index and make a new one, reusing same buffer
index.close();
final IncrementalIndex persistIndex = index;
persistFutures.add(
persistExecutor.submit(
new ThreadRenamingRunnable(String.format("%s-persist", file.getName()))
{
@Override
public void doRun()
{
try {
persist(persistIndex, interval, file, progressIndicator);
}
catch (Exception e) {
log.error("persist index error", e);
throw Throwables.propagate(e);
}
finally {
// close this index
persistIndex.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the runnable exits, does the future still hold a non-weak reference to persistIndex?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen How to get persistIndex from the future? I do not understand.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the runnable exits, the future do not hold a non-weak reference to persistIndex, see

public class ListenableFutureTask<V> extends FutureTask<V>
    implements ListenableFuture<V> {
}

public class FutureTask<V> implements RunnableFuture<V> {

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        ...
        callable = null;        // to reduce footprint
    }
}

When runnable exits, the future will nulls out callable.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool. Thanks for digging in on that

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the runnable is still held somewhere though ( in the future?), which would hold onto a persistIndex.

Is there a way you can check to make sure you don't accidentally hold onto all incremental indices in the futures?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The future hold a reference to the runnable which hold onto a persistIndex.
When the runnable exit, the future will nulls out it's callable, so hold not reference to the runnable so the runnable and persistIndex will became garbage and will gc collect.
We have already use the feature in our hadoop index build jobs, and see not problem.

}
}
}
)
);

index = makeIncrementalIndex(
bucket,
Expand Down Expand Up @@ -611,6 +677,9 @@ protected void reduce(
toMerge.add(finalFile);
}

Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to hardcode a timeout here? wouldn't hadoop automatically kill the containers if no progress is being made.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on second thought, it is probably ok as 1 hr should be practically infinite in this case.

persistExecutor.shutdown();

for (File file : toMerge) {
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
Expand Down Expand Up @@ -665,8 +734,17 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
FileUtils.deleteDirectory(file);
}
}
catch (ExecutionException e) {
throw Throwables.propagate(e);
}
catch (TimeoutException e) {
throw Throwables.propagate(e);
}
finally {
index.close();
if (persistExecutor != null) {
persistExecutor.shutdownNow();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object
false,
false,
null,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ public DetermineHashedPartitionsJobTest(String dataFilePath, long targetPartitio
false,
false,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ public DeterminePartitionsJobTest(
false,
false,
null,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ public void testHashedBucketSelection()
false,
false,
null,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public void testSerde() throws Exception
true,
true,
null,
null,
null
);

Expand All @@ -72,6 +73,7 @@ public void testSerde() throws Exception
Assert.assertEquals(ImmutableMap.<String, String>of(), actual.getJobProperties());
Assert.assertEquals(true, actual.isCombineText());
Assert.assertEquals(true, actual.getUseCombiner());
Assert.assertEquals(0, actual.getPersistBackgroundCount());

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,8 @@ public void setUp() throws Exception
false,
useCombiner,
null,
buildV9Directly
buildV9Directly,
null
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public void setup() throws Exception
false,
false,
null,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testAddInputPath() throws Exception
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null)
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null)
);

granularityPathSpec.setDataGranularity(Granularity.HOUR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ public InputStream openStream() throws IOException
false,
false,
null,
null,
null
)
)
Expand Down