Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -79,7 +78,6 @@ public class IndexMergeBenchmark

private static final Logger log = new Logger(IndexMergeBenchmark.class);
private static final int RNG_SEED = 9999;
private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
Expand All @@ -101,7 +99,6 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}

Expand Down Expand Up @@ -170,33 +167,6 @@ private IncrementalIndex makeIncIndex()
.buildOnheap();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void merge(Blackhole blackhole) throws Exception
{
File tmpFile = File.createTempFile("IndexMergeBenchmark-MERGEDFILE-" + System.currentTimeMillis(), ".TEMPFILE");
tmpFile.delete();
tmpFile.mkdirs();
try {
log.info(tmpFile.getAbsolutePath() + " isFile: " + tmpFile.isFile() + " isDir:" + tmpFile.isDirectory());

File mergedFile = INDEX_MERGER.mergeQueryableIndex(
indexesToMerge,
rollup,
schemaInfo.getAggsArray(),
tmpFile,
new IndexSpec()
);

blackhole.consume(mergedFile);
}
finally {
tmpFile.delete();
}

}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.IndexSpec;
import io.druid.segment.column.ColumnConfig;
Expand Down Expand Up @@ -81,7 +80,6 @@ public class IndexPersistBenchmark
private BenchmarkSchemaInfo schemaInfo;


private static final IndexMerger INDEX_MERGER;
private static final IndexMergerV9 INDEX_MERGER_V9;
private static final IndexIO INDEX_IO;
public static final ObjectMapper JSON_MAPPER;
Expand All @@ -99,7 +97,6 @@ public int columnCacheSizeBytes()
}
}
);
INDEX_MERGER = new IndexMerger(JSON_MAPPER, INDEX_IO);
INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO);
}

Expand Down Expand Up @@ -164,28 +161,6 @@ private IncrementalIndex makeIncIndex()
.buildOnheap();
}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public void persist(Blackhole blackhole) throws Exception
{
File tmpDir = Files.createTempDir();
log.info("Using temp dir: " + tmpDir.getAbsolutePath());
try {
File indexFile = INDEX_MERGER.persist(
incIndex,
tmpDir,
new IndexSpec()
);

blackhole.consume(indexFile);
}
finally {
FileUtils.deleteDirectory(tmpDir);
}

}

@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no|
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever. This option is deprecated. Use `completionTimeout` of KafkaSupervisorIOConfig instead.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
Expand Down
1 change: 0 additions & 1 deletion docs/content/ingestion/batch-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)|
|jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)|
|indexSpec|Object|Tune how data is indexed. See below for more information.|no|
|buildV9Directly|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)|
|forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|no (default = false)|
|useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)|
Expand Down
1 change: 0 additions & 1 deletion docs/content/ingestion/stream-pull.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|rejectionPolicy|Object|Controls how data sets the data acceptance policy for creating and handing off segments. More on this below.|no (default == 'serverTime')|
|maxPendingPersists|Integer|Maximum number of persists that can be pending, but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0; meaning one persist can be running concurrently with ingestion, and none can be queued up)|
|shardSpec|Object|This describes the shard that is represented by this server. This must be specified properly in order to have multiple realtime nodes indexing the same data stream in a [sharded fashion](#sharding).|no (default == 'NoneShardSpec')|
|buildV9Directly|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|persistThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the persisting thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|mergeThreadPriority|int|If `-XX:+UseThreadPriorities` is properly enabled, this will set the thread priority of the merging thread to `Thread.NORM_PRIORITY` plus this value within the bounds of `Thread.MIN_PRIORITY` and `Thread.MAX_PRIORITY`. A value of 0 indicates to not change the thread priority.|no (default == 0; inherit and do not override)|
|reportParseExceptions|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion. If false, unparseable rows and fields will be skipped. If an entire row is skipped, the "unparseable" counter will be incremented. If some fields in a row were parseable and some were not, the parseable fields will be indexed and the "unparseable" counter will not be incremented.|no (default == false)|
Expand Down
4 changes: 0 additions & 4 deletions docs/content/ingestion/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|
|maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no|
|buildV9Directly|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|true|no|
|forceExtendableShardSpecs|Forces use of extendable shardSpecs. Experimental feature intended for use with the [Kafka indexing service extension](../development/extensions-core/kafka-ingestion.html).|false|no|
|reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no|
|publishTimeout|Milliseconds to wait for publishing segments. It must be >= 0, where 0 means to wait forever.|0|no|
Expand Down Expand Up @@ -162,7 +161,6 @@ Append tasks append a list of segments together into a single segment (one after
"id": <task_id>,
"dataSource": <task_datasource>,
"segments": <JSON list of DataSegment objects to append>,
"buildV9Directly": <true or false, default true>,
"aggregations": <optional list of aggregators>
}
```
Expand All @@ -181,7 +179,6 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"buildV9Directly": <true or false, default true>,
"segments": <JSON list of DataSegment objects to merge>
}
```
Expand All @@ -199,7 +196,6 @@ The grammar is:
"dataSource": <task_datasource>,
"aggregations": <list of aggregators>,
"rollup": <whether or not to rollup data during a merge>,
"buildV9Directly": <true or false, default true>,
"interval": <DataSegment objects in this interval are going to be merged>
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox
toolbox.getSegmentPusher(),
toolbox.getObjectMapper(),
toolbox.getIndexIO(),
tuningConfig.getBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getQueryRunnerFactoryConglomerate(),
toolbox.getSegmentAnnouncer(),
toolbox.getEmitter(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final File basePersistDirectory;
private final int maxPendingPersists;
private final IndexSpec indexSpec;
private final boolean buildV9Directly;
private final boolean reportParseExceptions;
@Deprecated
private final long handoffConditionTimeout;
Expand All @@ -54,6 +53,7 @@ public KafkaTuningConfig(
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
Expand All @@ -71,7 +71,6 @@ public KafkaTuningConfig(
this.basePersistDirectory = defaults.getBasePersistDirectory();
this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.buildV9Directly = buildV9Directly == null ? defaults.getBuildV9Directly() : buildV9Directly;
this.reportParseExceptions = reportParseExceptions == null
? defaults.isReportParseExceptions()
: reportParseExceptions;
Expand All @@ -92,7 +91,7 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
config.basePersistDirectory,
config.maxPendingPersists,
config.indexSpec,
config.buildV9Directly,
true,
config.reportParseExceptions,
config.handoffConditionTimeout,
config.resetOffsetAutomatically
Expand Down Expand Up @@ -140,10 +139,14 @@ public IndexSpec getIndexSpec()
return indexSpec;
}

/**
* Always returns true, doesn't affect the version being built.
*/
@Deprecated
@JsonProperty
public boolean getBuildV9Directly()
{
return buildV9Directly;
return true;
}

@Override
Expand Down Expand Up @@ -175,7 +178,7 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
dir,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
Expand All @@ -191,7 +194,7 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows)
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
Expand Down Expand Up @@ -219,9 +222,6 @@ public boolean equals(Object o)
if (maxPendingPersists != that.maxPendingPersists) {
return false;
}
if (buildV9Directly != that.buildV9Directly) {
return false;
}
if (reportParseExceptions != that.reportParseExceptions) {
return false;
}
Expand Down Expand Up @@ -254,7 +254,6 @@ public int hashCode()
result = 31 * result + (basePersistDirectory != null ? basePersistDirectory.hashCode() : 0);
result = 31 * result + maxPendingPersists;
result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0);
result = 31 * result + (buildV9Directly ? 1 : 0);
result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
Expand All @@ -271,7 +270,6 @@ public String toString()
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +
", indexSpec=" + indexSpec +
", buildV9Directly=" + buildV9Directly +
", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
// This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12.
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility
Expand All @@ -62,7 +63,7 @@ public KafkaSupervisorTuningConfig(
basePersistDirectory,
maxPendingPersists,
indexSpec,
buildV9Directly,
true,
reportParseExceptions,
// Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of
// handoffConditionTimeout
Expand Down Expand Up @@ -124,7 +125,6 @@ public String toString()
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", buildV9Directly=" + getBuildV9Directly() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import com.metamx.metrics.MonitorScheduler;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
import io.druid.java.util.common.StringUtils;
import io.druid.concurrent.Execs;
import io.druid.data.input.impl.DimensionsSpec;
import io.druid.data.input.impl.JSONParseSpec;
Expand Down Expand Up @@ -75,6 +74,7 @@
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.granularity.Granularities;
import io.druid.java.util.common.guava.Sequences;
import io.druid.metadata.EntryExistsException;
Expand Down Expand Up @@ -131,8 +131,6 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
Expand All @@ -146,7 +144,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@RunWith(Parameterized.class)
public class KafkaIndexTaskTest
{
private static final Logger log = new Logger(KafkaIndexTaskTest.class);
Expand All @@ -160,7 +157,6 @@ public class KafkaIndexTaskTest
private static int topicPostfix;

private final List<Task> runningTasks = Lists.newArrayList();
private final boolean buildV9Directly;

private long handoffConditionTimeout = 0;
private boolean reportParseExceptions = false;
Expand Down Expand Up @@ -221,17 +217,6 @@ private static String getTopicName()
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();

@Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(new Object[]{true}, new Object[]{false});
}

public KafkaIndexTaskTest(boolean buildV9Directly)
{
this.buildV9Directly = buildV9Directly;
}

@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();

Expand Down Expand Up @@ -1401,7 +1386,7 @@ private KafkaIndexTask createTask(
null,
null,
null,
buildV9Directly,
true,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically
Expand Down Expand Up @@ -1566,7 +1551,6 @@ public List<StorageLocationConfig> getLocations()
)
),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexMerger(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
new CacheConfig(),
Expand Down
Loading