Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ The spec\_file is a path to a file that contains JSON and an example looks like:
"ignoreInvalidRows" : false,
"jobProperties" : { },
"combineText" : false,
"rowFlushBoundary" : 300000
"rowFlushBoundary" : 300000,
"buildV9Directly" : false
}
}
```
Expand Down Expand Up @@ -205,6 +206,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|ignoreInvalidRows|Boolean|Ignore rows found to have problems.|no (default == false)|
|useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|

### Partitioning specification

Expand Down
1 change: 1 addition & 0 deletions docs/content/ingestion/realtime-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default=='serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec'|
|buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)|

#### Rejection Policy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
Expand Down Expand Up @@ -67,11 +65,10 @@ public Object extractValue(InputRow inputRow, String metricName)
}

@Override
public ColumnPartSerde deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
public void deserializeColumn(ByteBuffer buffer, ColumnBuilder builder)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we are changing the behavior of this method, can we please add a comment on the interface about how the method is supposed to be used?

{
GenericIndexed<Sketch> ge = GenericIndexed.read(buffer, strategy);
builder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), ge));
return new ComplexColumnPartSerde(ge, getTypeName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.druid.segment.column.ColumnBuilder;
import io.druid.segment.data.GenericIndexed;
import io.druid.segment.data.ObjectStrategy;
import io.druid.segment.serde.ColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSerde;
import io.druid.segment.serde.ComplexColumnPartSupplier;
import io.druid.segment.serde.ComplexMetricExtractor;
import io.druid.segment.serde.ComplexMetricSerde;
Expand Down Expand Up @@ -92,15 +90,12 @@ public ApproximateHistogram extractValue(InputRow inputRow, String metricName)
}

@Override
public ColumnPartSerde deserializeColumn(
public void deserializeColumn(
ByteBuffer byteBuffer, ColumnBuilder columnBuilder
)
{
final GenericIndexed column = GenericIndexed.read(byteBuffer, getObjectStrategy());

columnBuilder.setComplexColumn(new ComplexColumnPartSupplier(getTypeName(), column));

return new ComplexColumnPartSerde(column, getTypeName());
}

public ObjectStrategy getObjectStrategy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.druid.initialization.Initialization;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.indexing.granularity.GranularitySpec;
import io.druid.server.DruidNode;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class HadoopDruidIndexerConfig
public static final ObjectMapper JSON_MAPPER;
public static final IndexIO INDEX_IO;
public static final IndexMerger INDEX_MERGER;
public static final IndexMergerV9 INDEX_MERGER_V9;

private static final String DEFAULT_WORKING_PATH = "/tmp/druid-indexing";

Expand All @@ -112,6 +114,7 @@ public void configure(Binder binder)
JSON_MAPPER = injector.getInstance(ObjectMapper.class);
INDEX_IO = injector.getInstance(IndexIO.class);
INDEX_MERGER = injector.getInstance(IndexMerger.class);
INDEX_MERGER_V9 = injector.getInstance(IndexMergerV9.class);
}

public static enum IndexJobCounters
Expand Down Expand Up @@ -351,6 +354,11 @@ public HadoopyShardSpec getShardSpec(Bucket bucket)
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
}

public boolean isBuildV9Directly()
{
return schema.getTuningConfig().getBuildV9Directly();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will throw an NPE if getBuildV9Directly returns null

}

/**
* Job instance should have Configuration set (by calling {@link #addJobProperties(Job)}
* or via injected system properties) before this method is called. The {@link PathSpec} may
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE;

public static HadoopTuningConfig makeDefaultTuningConfig()
{
Expand All @@ -59,7 +60,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
null,
false,
false,
null
null,
DEFAULT_BUILD_V9_DIRECTLY
);
}

Expand All @@ -76,6 +78,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final Map<String, String> jobProperties;
private final boolean combineText;
private final boolean useCombiner;
private final Boolean buildV9Directly;

@JsonCreator
public HadoopTuningConfig(
Expand All @@ -93,7 +96,8 @@ public HadoopTuningConfig(
final @JsonProperty("combineText") boolean combineText,
final @JsonProperty("useCombiner") Boolean useCombiner,
// See https://github.com/druid-io/druid/pull/1922
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT
final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT,
final @JsonProperty("buildV9Directly") Boolean buildV9Directly
)
{
this.workingPath = workingPath;
Expand All @@ -111,6 +115,7 @@ public HadoopTuningConfig(
: ImmutableMap.copyOf(jobProperties));
this.combineText = combineText;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly;
}

@JsonProperty
Expand Down Expand Up @@ -191,6 +196,11 @@ public boolean getUseCombiner()
return useCombiner;
}

@JsonProperty
public Boolean getBuildV9Directly() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since buildV9Directly is never null, this should probably be boolean instead of Boolean

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also renamed to isBuildV9Directly

return buildV9Directly;
}

public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
Expand All @@ -207,7 +217,8 @@ public HadoopTuningConfig withWorkingPath(String path)
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}

Expand All @@ -227,7 +238,8 @@ public HadoopTuningConfig withVersion(String ver)
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}

Expand All @@ -247,7 +259,8 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
jobProperties,
combineText,
useCombiner,
null
null,
buildV9Directly
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ protected ProgressIndicator makeProgressIndicator(final Context context)
@Override
public void progress()
{
super.progress();
context.progress();
}
};
Expand All @@ -466,9 +467,15 @@ private File persist(
final ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.persist(
index, interval, file, null, config.getIndexSpec(), progressIndicator
);
}
}

protected File mergeQueryableIndex(
Expand All @@ -478,9 +485,15 @@ protected File mergeQueryableIndex(
ProgressIndicator progressIndicator
) throws IOException
{
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
if (config.isBuildV9Directly()) {
return HadoopDruidIndexerConfig.INDEX_MERGER_V9.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
} else {
return HadoopDruidIndexerConfig.INDEX_MERGER.mergeQueryableIndex(
indexes, aggs, file, config.getIndexSpec(), progressIndicator
);
}
}

@Override
Expand Down Expand Up @@ -586,7 +599,7 @@ protected void reduce(
indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file));
}
mergedBase = mergeQueryableIndex(
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator
);
}
final FileSystem outputFS = new Path(config.getSchema().getIOConfig().getSegmentOutputPath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object
null,
false,
false,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public DetermineHashedPartitionsJobTest(String dataFilePath, long targetPartitio
null,
false,
false,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public DeterminePartitionsJobTest(
null,
false,
false,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void testHashedBucketSelection()
null,
false,
false,
null,
null
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public void testSerde() throws Exception
null,
true,
true,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,7 @@ public void setUp() throws Exception
ImmutableMap.of(JobContext.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
false,
useCombiner,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void setup() throws Exception
),
false,
false,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testAddInputPath() throws Exception
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null)
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null)
);

granularityPathSpec.setDataGranularity(Granularity.HOUR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ public InputStream openStream() throws IOException
null,
false,
false,
null,
null
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class TaskToolbox
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;

private final IndexMergerV9 indexMergerV9;

public TaskToolbox(
TaskConfig config,
Expand All @@ -102,7 +103,8 @@ public TaskToolbox(
IndexMerger indexMerger,
IndexIO indexIO,
Cache cache,
CacheConfig cacheConfig
CacheConfig cacheConfig,
IndexMergerV9 indexMergerV9
)
{
this.config = config;
Expand All @@ -125,6 +127,7 @@ public TaskToolbox(
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
}

public TaskConfig getConfig()
Expand Down Expand Up @@ -247,4 +250,8 @@ public CacheConfig getCacheConfig()
{
return cacheConfig;
}

public IndexMergerV9 getIndexMergerV9() {
return indexMergerV9;
}
}
Loading