Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ The spec\_file is a path to a file that contains JSON and an example looks like:
"overwriteFiles" : false,
"ignoreInvalidRows" : false,
"jobProperties" : { },
"combineText" : false,
"persistInHeap" : false,
"ingestOffheap" : false,
"bufferSize" : 134217728,
"aggregationBufferRatio" : 0.5,
"combineText" : false,
"rowFlushBoundary" : 300000
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,6 @@ public Bucket apply(HadoopyShardSpec input)
}
}

public boolean isPersistInHeap()
{
return schema.getTuningConfig().isPersistInHeap();
}

public String getWorkingPath()
{
final String workingPath = schema.getTuningConfig().getWorkingPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,7 @@ public boolean run()
List<Jobby> jobs = Lists.newArrayList();
JobHelper.ensurePaths(config);

if (config.isPersistInHeap()) {
indexJob = new IndexGeneratorJob(config);
} else {
indexJob = new LegacyIndexGeneratorJob(config);
}
indexJob = new IndexGeneratorJob(config);
jobs.add(indexJob);

if (metadataStorageUpdaterJob != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.druid.indexer.partitions.HashedPartitionsSpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.segment.IndexSpec;
import io.druid.segment.data.BitmapSerde;
import io.druid.segment.indexing.TuningConfig;
import org.joda.time.DateTime;

Expand All @@ -37,11 +36,9 @@
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.<DateTime, List<HadoopyShardSpec>>of();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false;

public static HadoopTuningConfig makeDefaultTuningConfig()
Expand All @@ -59,11 +56,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
false,
null,
false,
false,
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
false
);
}

Expand All @@ -79,10 +72,6 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean ignoreInvalidRows;
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner;

@JsonCreator
Expand All @@ -99,10 +88,6 @@ public HadoopTuningConfig(
final @JsonProperty("ignoreInvalidRows") boolean ignoreInvalidRows,
final @JsonProperty("jobProperties") Map<String, String> jobProperties,
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner
)
{
Expand All @@ -120,10 +105,6 @@ public HadoopTuningConfig(
? ImmutableMap.<String, String>of()
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.persistInHeap = persistInHeap;
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
}

Expand Down Expand Up @@ -199,28 +180,6 @@ public boolean isCombineText()
return combineText;
}

@JsonProperty
public boolean isPersistInHeap()
{
return persistInHeap;
}

@JsonProperty
public boolean isIngestOffheap(){
return ingestOffheap;
}

@JsonProperty
public int getBufferSize(){
return bufferSize;
}

@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}

@JsonProperty
public boolean getUseCombiner()
{
Expand All @@ -242,10 +201,6 @@ public HadoopTuningConfig withWorkingPath(String path)
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner
);
}
Expand All @@ -265,10 +220,6 @@ public HadoopTuningConfig withVersion(String ver)
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner
);
}
Expand All @@ -288,10 +239,6 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
ignoreInvalidRows,
jobProperties,
combineText,
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio,
useCombiner
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,16 @@
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import io.druid.collections.StupidPool;
import io.druid.data.input.InputRow;
import io.druid.data.input.Row;
import io.druid.data.input.Rows;
import io.druid.indexer.hadoop.SegmentInputRow;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.LoggingProgressIndicator;
import io.druid.segment.ProgressIndicator;
import io.druid.segment.QueryableIndex;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.incremental.OffheapIncrementalIndex;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -157,7 +154,7 @@ public boolean run()
throw new RuntimeException("No buckets?? seems there is no data to index.");
}

if(config.getSchema().getTuningConfig().getUseCombiner()) {
if (config.getSchema().getTuningConfig().getUseCombiner()) {
job.setCombinerClass(IndexGeneratorCombiner.class);
job.setCombinerKeyGroupingComparatorClass(BytesWritable.Comparator.class);
}
Expand Down Expand Up @@ -200,9 +197,7 @@ public boolean run()
private static IncrementalIndex makeIncrementalIndex(
Bucket theBucket,
AggregatorFactory[] aggs,
HadoopDruidIndexerConfig config,
boolean isOffHeap,
StupidPool bufferPool
HadoopDruidIndexerConfig config
)
{
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
Expand All @@ -212,19 +207,11 @@ private static IncrementalIndex makeIncrementalIndex(
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
.build();
if (isOffHeap) {
return new OffheapIncrementalIndex(
indexSchema,
bufferPool,
true,
tuningConfig.getBufferSize()
);
} else {
return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}

return new OnheapIncrementalIndex(
indexSchema,
tuningConfig.getRowFlushBoundary()
);
}

public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, BytesWritable>
Expand Down Expand Up @@ -320,20 +307,20 @@ protected void reduce(
Iterator<BytesWritable> iter = values.iterator();
BytesWritable first = iter.next();

if(iter.hasNext()) {
if (iter.hasNext()) {
SortableBytes keyBytes = SortableBytes.fromBytesWritable(key);
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
IncrementalIndex index = makeIncrementalIndex(bucket, combiningAggs, config);
index.add(InputRowSerde.fromBytes(first.getBytes(), aggregators));

while(iter.hasNext()) {
while (iter.hasNext()) {
context.progress();
InputRow value = InputRowSerde.fromBytes(iter.next().getBytes(), aggregators);

if(!index.canAppendRow()) {
if (!index.canAppendRow()) {
log.info("current index full due to [%s]. creating new index.", index.getOutOfRowsReason());
flushIndexToContextAndClose(key, index, context);
index = makeIncrementalIndex(bucket, combiningAggs, config, false, null);
index = makeIncrementalIndex(bucket, combiningAggs, config);
}

index.add(value);
Expand All @@ -345,10 +332,11 @@ protected void reduce(
}
}

private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context) throws IOException, InterruptedException
private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex index, Context context)
throws IOException, InterruptedException
{
Iterator<Row> rows = index.iterator();
while(rows.hasNext()) {
while (rows.hasNext()) {
context.progress();
Row row = rows.next();
InputRow inputRow = getInputRowFromRow(row, index.getDimensions());
Expand All @@ -360,7 +348,8 @@ private void flushIndexToContextAndClose(BytesWritable key, IncrementalIndex ind
index.close();
}

private InputRow getInputRowFromRow(final Row row, final List<String> dimensions) {
private InputRow getInputRowFromRow(final Row row, final List<String> dimensions)
{
return new InputRow()
{
@Override
Expand Down Expand Up @@ -467,14 +456,14 @@ public void progress()
};
}

protected File persist(
private File persist(
final IncrementalIndex index,
final Interval interval,
final File file,
final ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MAKER.persist(
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
}
Expand Down Expand Up @@ -514,17 +503,11 @@ protected void reduce(
Bucket bucket = Bucket.fromGroupKey(keyBytes.getGroupKey()).lhs;

final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get();
final int maxTotalBufferSize = config.getSchema().getTuningConfig().getBufferSize();
final int aggregationBufferSize = (int) ((double) maxTotalBufferSize
* config.getSchema().getTuningConfig().getAggregationBufferRatio());

final StupidPool<ByteBuffer> bufferPool = new OffheapBufferPool(aggregationBufferSize);
IncrementalIndex index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
config
);
try {
File baseFlushFile = File.createTempFile("base", "flush");
Expand Down Expand Up @@ -570,9 +553,7 @@ protected void reduce(
index = makeIncrementalIndex(
bucket,
combiningAggs,
config,
config.getSchema().getTuningConfig().isIngestOffheap(),
bufferPool
config
);
startTime = System.currentTimeMillis();
++indexCount;
Expand Down Expand Up @@ -602,7 +583,7 @@ protected void reduce(
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
Expand Down
Loading