Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
tuningConfig.getIndexSpec(),
tuningConfig.getIndexSpecForIntermediatePersists(),
tuningConfig.getAppendableIndexSpec(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getMaxBytesInMemory(),
tuningConfig.isLeaveIntermediate(),
tuningConfig.isCleanupOnFailure(),
Expand All @@ -191,7 +191,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List<DataSe
tuningConfig.getJobProperties(),
tuningConfig.isCombineText(),
tuningConfig.getUseCombiner(),
tuningConfig.getRowFlushBoundary(),
tuningConfig.getMaxRowsInMemory(),
tuningConfig.getBuildV9Directly(),
tuningConfig.getNumBackgroundPersistThreads(),
tuningConfig.isForceExtendableShardSpecs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class HadoopTuningConfig implements TuningConfig
private static final DimensionBasedPartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.defaultSpec();
private static final Map<Long, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0;

Expand All @@ -58,7 +57,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
DEFAULT_INDEX_SPEC,
DEFAULT_INDEX_SPEC,
DEFAULT_APPENDABLE_INDEX,
DEFAULT_ROW_FLUSH_BOUNDARY,
DEFAULT_MAX_ROWS_IN_MEMORY,
0L,
false,
true,
Expand Down Expand Up @@ -86,7 +85,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final IndexSpec indexSpec;
private final IndexSpec indexSpecForIntermediatePersists;
private final AppendableIndexSpec appendableIndexSpec;
private final int rowFlushBoundary;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean leaveIntermediate;
private final boolean cleanupOnFailure;
Expand Down Expand Up @@ -141,8 +140,8 @@ public HadoopTuningConfig(
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ?
this.indexSpec : indexSpecForIntermediatePersists;
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_ROW_FLUSH_BOUNDARY
this.maxRowsInMemory = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_MAX_ROWS_IN_MEMORY
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
// initializing this to 0, it will be lazily initialized to a value
Expand Down Expand Up @@ -192,6 +191,7 @@ public String getVersion()
return version;
}

@Override
@JsonProperty
public DimensionBasedPartitionsSpec getPartitionsSpec()
{
Expand All @@ -204,12 +204,14 @@ public Map<Long, List<HadoopyShardSpec>> getShardSpecs()
return shardSpecs;
}

@Override
@JsonProperty
public IndexSpec getIndexSpec()
{
return indexSpec;
}

@Override
@JsonProperty
public IndexSpec getIndexSpecForIntermediatePersists()
{
Expand All @@ -223,10 +225,11 @@ public AppendableIndexSpec getAppendableIndexSpec()
return appendableIndexSpec;
}

@JsonProperty("maxRowsInMemory")
public int getRowFlushBoundary()
@Override
@JsonProperty
public int getMaxRowsInMemory()
{
return rowFlushBoundary;
return maxRowsInMemory;
}

@JsonProperty
Expand Down Expand Up @@ -341,7 +344,7 @@ public HadoopTuningConfig withWorkingPath(String path)
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxRowsInMemory,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
Expand Down Expand Up @@ -372,7 +375,7 @@ public HadoopTuningConfig withVersion(String ver)
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxRowsInMemory,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
Expand Down Expand Up @@ -403,7 +406,7 @@ public HadoopTuningConfig withShardSpecs(Map<Long, List<HadoopyShardSpec>> specs
indexSpec,
indexSpecForIntermediatePersists,
appendableIndexSpec,
rowFlushBoundary,
maxRowsInMemory,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ private static IncrementalIndex makeIncrementalIndex(
// Build the incremental-index according to the spec that was chosen by the user
IncrementalIndex newIndex = tuningConfig.getAppendableIndexSpec().builder()
.setIndexSchema(indexSchema)
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxRowCount(tuningConfig.getMaxRowsInMemory())
.setMaxBytesInMemory(tuningConfig.getMaxBytesInMemoryOrDefault())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testSerde() throws Exception
Assert.assertEquals(ImmutableMap.<Long, List<HadoopyShardSpec>>of(), actual.getShardSpecs());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpec());
Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists());
Assert.assertEquals(100, actual.getRowFlushBoundary());
Assert.assertEquals(100, actual.getMaxRowsInMemory());
Assert.assertEquals(true, actual.isLeaveIntermediate());
Assert.assertEquals(true, actual.isCleanupOnFailure());
Assert.assertEquals(true, actual.isOverwriteFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;

Expand All @@ -43,6 +45,11 @@ public interface TuningConfig
*/
AppendableIndexSpec getAppendableIndexSpec();

/**
* Maximum number of rows in memory before persisting to local storage
*/
int getMaxRowsInMemory();

/**
* Maximum number of bytes (estimated) to store in memory before persisting to local storage
*/
Expand All @@ -66,4 +73,10 @@ default long getMaxBytesInMemoryOrDefault()
return Long.MAX_VALUE;
}
}

PartitionsSpec getPartitionsSpec();

IndexSpec getIndexSpec();

IndexSpec getIndexSpecForIntermediatePersists();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.segment.realtime.appenderator;

import org.apache.druid.indexer.partitions.PartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
Expand All @@ -32,11 +30,6 @@ public interface AppenderatorConfig extends TuningConfig
{
boolean isReportParseExceptions();

/**
* Maximum number of rows in memory before persisting to local storage
*/
int getMaxRowsInMemory();

int getMaxPendingPersists();

/**
Expand All @@ -57,17 +50,11 @@ default Long getMaxTotalRows()
throw new UnsupportedOperationException("maxTotalRows is not implemented.");
}

PartitionsSpec getPartitionsSpec();

/**
* Period that sets frequency to persist to local storage if no other thresholds are met
*/
Period getIntermediatePersistPeriod();

IndexSpec getIndexSpec();

IndexSpec getIndexSpecForIntermediatePersists();

File getBasePersistDirectory();

AppenderatorConfig withBasePersistDirectory(File basePersistDirectory);
Expand Down