From 34e3e5707ef5ba12fa9d77d989bb60d1425d1333 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Fri, 19 May 2017 19:35:07 +0800 Subject: [PATCH 1/3] introduce SinkFactory in tunningConfig --- .../indexing/kafka/KafkaTuningConfig.java | 30 +++++-- .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 6 +- .../indexing/kafka/KafkaIndexTaskTest.java | 3 +- .../indexing/kafka/KafkaTuningConfigTest.java | 1 + .../kafka/supervisor/KafkaSupervisorTest.java | 1 + .../common/index/YeOldePlumberSchool.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 26 ++++-- .../indexing/common/task/IndexTaskTest.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 5 +- .../indexing/overlord/TaskLifecycleTest.java | 7 +- .../indexing/RealtimeTuningConfig.java | 24 +++++- .../appenderator/AppenderatorConfig.java | 3 + .../appenderator/AppenderatorImpl.java | 4 +- .../realtime/plumber/DefaultSinkFactory.java | 84 +++++++++++++++++++ .../realtime/plumber/RealtimePlumber.java | 4 +- .../segment/realtime/plumber/SinkFactory.java | 55 ++++++++++++ .../segment/realtime/RealtimeManagerTest.java | 33 ++++++-- .../appenderator/AppenderatorPlumberTest.java | 1 + .../appenderator/AppenderatorTester.java | 1 + ...DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 15 ++-- .../segment/realtime/plumber/SinkTest.java | 25 +++++- .../cli/validate/DruidJsonValidatorTest.java | 1 + 25 files changed, 295 insertions(+), 40 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/DefaultSinkFactory.java create mode 100644 server/src/main/java/io/druid/segment/realtime/plumber/SinkFactory.java diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index e9dc4463f80e..4f28433ebc6e 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -25,6 +25,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.TuningConfig; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.plumber.SinkFactory; import org.joda.time.Period; import java.io.File; @@ -44,6 +45,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig @Deprecated private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; + private final SinkFactory sinkFactory; @JsonCreator public KafkaTuningConfig( @@ -57,7 +59,8 @@ public KafkaTuningConfig( @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, - @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically + @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, + @JsonProperty("sinkFactory") SinkFactory sinkFactory ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -80,6 +83,7 @@ public KafkaTuningConfig( this.resetOffsetAutomatically = resetOffsetAutomatically == null ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; + this.sinkFactory = sinkFactory == null ? defaults.getSinkFactory() : sinkFactory; } public static KafkaTuningConfig copyOf(KafkaTuningConfig config) @@ -94,7 +98,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) true, config.reportParseExceptions, config.handoffConditionTimeout, - config.resetOffsetAutomatically + config.resetOffsetAutomatically, + config.sinkFactory ); } @@ -125,6 +130,13 @@ public File getBasePersistDirectory() return basePersistDirectory; } + @Override + @JsonProperty + public SinkFactory getSinkFactory() + { + return sinkFactory; + } + @Override @JsonProperty public int getMaxPendingPersists() @@ -181,7 +193,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) true, reportParseExceptions, handoffConditionTimeout, - resetOffsetAutomatically + resetOffsetAutomatically, + sinkFactory ); } @@ -197,7 +210,8 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows) true, reportParseExceptions, handoffConditionTimeout, - resetOffsetAutomatically + resetOffsetAutomatically, + sinkFactory ); } @@ -241,8 +255,10 @@ public boolean equals(Object o) : that.basePersistDirectory != null) { return false; } - return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null; - + if (indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null) { + return false; + } + return sinkFactory != null ? sinkFactory.equals(that.sinkFactory) : that.sinkFactory == null; } @Override @@ -257,6 +273,7 @@ public int hashCode() result = 31 * result + (reportParseExceptions ? 1 : 0); result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32)); result = 31 * result + (resetOffsetAutomatically ? 1 : 0); + result = 31 * result + (sinkFactory != null ? sinkFactory.hashCode() : 0); return result; } @@ -273,6 +290,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", handoffConditionTimeout=" + handoffConditionTimeout + ", resetOffsetAutomatically=" + resetOffsetAutomatically + + ", sinkFactory=" + sinkFactory + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index c476b05e1053..a8a1b7556a48 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -88,6 +88,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index ef83165e8ebd..afbd8aac065f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.indexing.kafka.KafkaTuningConfig; import io.druid.segment.IndexSpec; +import io.druid.segment.realtime.plumber.SinkFactory; import org.joda.time.Duration; import org.joda.time.Period; @@ -48,6 +49,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, + @JsonProperty("sinkFactory") SinkFactory sinkFactory, @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, @JsonProperty("chatRetries") Long chatRetries, @@ -68,7 +70,8 @@ public KafkaSupervisorTuningConfig( // Supervised kafka tasks should respect KafkaSupervisorIOConfig.completionTimeout instead of // handoffConditionTimeout handoffConditionTimeout, - resetOffsetAutomatically + resetOffsetAutomatically, + sinkFactory ); this.workerThreads = workerThreads; @@ -128,6 +131,7 @@ public String toString() ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", sinkFactory=" + getSinkFactory() + ", workerThreads=" + workerThreads + ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index fd3761debff0..71e7c0b66f97 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1389,7 +1389,8 @@ private KafkaIndexTask createTask( true, reportParseExceptions, handoffConditionTimeout, - resetOffsetAutomatically + resetOffsetAutomatically, + null ); final KafkaIndexTask task = new KafkaIndexTask( taskId, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 81aa36666836..caa4be08b39b 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -111,6 +111,7 @@ public void testCopyOf() throws Exception true, true, 5L, + null, null ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 058203e71b0b..626a6c78e4cf 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -192,6 +192,7 @@ public void setupTest() throws Exception false, null, null, + null, numThreads, TEST_CHAT_THREADS, TEST_CHAT_RETRIES, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 08b881f32d52..ed5128b4044e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -98,7 +98,7 @@ public Plumber findPlumber( ) { // There can be only one. - final Sink theSink = new Sink( + final Sink theSink = config.getSinkFactory().create( interval, schema, config.getShardSpec(), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index ce02f4adf000..e52506bad1a5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -83,7 +83,9 @@ import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; import io.druid.segment.realtime.plumber.Committers; +import io.druid.segment.realtime.plumber.DefaultSinkFactory; import io.druid.segment.realtime.plumber.NoopSegmentHandoffNotifierFactory; +import io.druid.segment.realtime.plumber.SinkFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.HashBasedNumberedShardSpec; import io.druid.timeline.partition.NoneShardSpec; @@ -593,7 +595,7 @@ public IndexIngestionSpec( this.ioConfig = ioConfig; this.tuningConfig = tuningConfig == null ? - new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null) + new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null, null) : tuningConfig; } @@ -660,6 +662,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private static final boolean DEFAULT_FORCE_EXTENDABLE_SHARD_SPECS = false; private static final boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = false; private static final long DEFAULT_PUBLISH_TIMEOUT = 0; + private static final SinkFactory DEFAULT_SINK_FACTORY = new DefaultSinkFactory(); static final int DEFAULT_TARGET_PARTITION_SIZE = 5000000; @@ -672,6 +675,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean forceExtendableShardSpecs; private final boolean reportParseExceptions; private final long publishTimeout; + private final SinkFactory sinkFactory; @JsonCreator public IndexTuningConfig( @@ -685,7 +689,8 @@ public IndexTuningConfig( @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, - @JsonProperty("publishTimeout") @Nullable Long publishTimeout + @JsonProperty("publishTimeout") @Nullable Long publishTimeout, + @JsonProperty("sinkFactory") @Nullable SinkFactory sinkFactory ) { this( @@ -697,7 +702,8 @@ public IndexTuningConfig( forceExtendableShardSpecs, reportParseExceptions, publishTimeout, - null + null, + sinkFactory ); } @@ -710,7 +716,8 @@ private IndexTuningConfig( @Nullable Boolean forceExtendableShardSpecs, @Nullable Boolean reportParseExceptions, @Nullable Long publishTimeout, - @Nullable File basePersistDirectory + @Nullable File basePersistDirectory, + @Nullable SinkFactory sinkFactory ) { Preconditions.checkArgument( @@ -735,6 +742,7 @@ private IndexTuningConfig( : reportParseExceptions; this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.basePersistDirectory = basePersistDirectory; + this.sinkFactory = sinkFactory == null ? DEFAULT_SINK_FACTORY : sinkFactory; } public IndexTuningConfig withBasePersistDirectory(File dir) @@ -748,7 +756,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir) forceExtendableShardSpecs, reportParseExceptions, publishTimeout, - dir + dir, + sinkFactory ); } @@ -784,6 +793,13 @@ public File getBasePersistDirectory() return basePersistDirectory; } + @JsonProperty + @Override + public SinkFactory getSinkFactory() + { + return sinkFactory; + } + @JsonProperty @Override public int getMaxPendingPersists() diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 22fddad373db..26cd21a253ad 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -749,6 +749,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( true, forceExtendableShardSpecs, reportParseException, + null, null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 82d09be4c44d..16ae03f34368 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -892,6 +892,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa 0, reportParseExceptions, handoffTimeout, + null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 5a3161dec802..6ddd7c636131 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -185,7 +185,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null, null) ), null, jsonMapper @@ -248,7 +248,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) ), null, jsonMapper @@ -499,6 +499,7 @@ public Plumber findPlumber( 0, true, null, + null, null ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 64d0ca48c3d3..3fc9d01b406e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -652,7 +652,7 @@ public void testIndexTask() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) ), null, MAPPER @@ -710,7 +710,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) ), null, MAPPER @@ -1075,7 +1075,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null, null) ), null, MAPPER @@ -1197,6 +1197,7 @@ private RealtimeIndexTask newRealtimeIndexTask() 0, null, null, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 5338055a4d8b..c671b5128926 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -25,9 +25,11 @@ import com.google.common.io.Files; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.appenderator.AppenderatorConfig; +import io.druid.segment.realtime.plumber.DefaultSinkFactory; import io.druid.segment.realtime.plumber.IntervalStartVersioningPolicy; import io.druid.segment.realtime.plumber.RejectionPolicyFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; +import io.druid.segment.realtime.plumber.SinkFactory; import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.timeline.partition.NoneShardSpec; import io.druid.timeline.partition.ShardSpec; @@ -50,6 +52,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig private static final Boolean defaultReportParseExceptions = Boolean.FALSE; private static final long defaultHandoffConditionTimeout = 0; private static final long defaultAlertTimeout = 0; + private static final SinkFactory defaultSinkFactory = new DefaultSinkFactory(); private static File createNewBasePersistDirectory() { @@ -74,7 +77,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis 0, defaultReportParseExceptions, defaultHandoffConditionTimeout, - defaultAlertTimeout + defaultAlertTimeout, + defaultSinkFactory ); } @@ -92,6 +96,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final long alertTimeout; + private final SinkFactory sinkFactory; @JsonCreator public RealtimeTuningConfig( @@ -110,7 +115,8 @@ public RealtimeTuningConfig( @JsonProperty("mergeThreadPriority") int mergeThreadPriority, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, - @JsonProperty("alertTimeout") Long alertTimeout + @JsonProperty("alertTimeout") Long alertTimeout, + @JsonProperty("sinkFactory") SinkFactory sinkFactory ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -138,6 +144,7 @@ public RealtimeTuningConfig( this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); + this.sinkFactory = sinkFactory == null ? defaultSinkFactory : sinkFactory; } @Override @@ -240,6 +247,13 @@ public long getAlertTimeout() return alertTimeout; } + @Override + @JsonProperty + public SinkFactory getSinkFactory() + { + return sinkFactory; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -257,7 +271,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout + alertTimeout, + sinkFactory ); } @@ -278,7 +293,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) mergeThreadPriority, reportParseExceptions, handoffConditionTimeout, - alertTimeout + alertTimeout, + sinkFactory ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java index 4b2c99f644c0..aa813daae422 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -20,6 +20,7 @@ package io.druid.segment.realtime.appenderator; import io.druid.segment.IndexSpec; +import io.druid.segment.realtime.plumber.SinkFactory; import org.joda.time.Period; import java.io.File; @@ -37,4 +38,6 @@ public interface AppenderatorConfig IndexSpec getIndexSpec(); File getBasePersistDirectory(); + + SinkFactory getSinkFactory(); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 4e3062a5d0cf..e3f9145945e6 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -254,7 +254,7 @@ private Sink getOrCreateSink(final SegmentIdentifier identifier) Sink retVal = sinks.get(identifier); if (retVal == null) { - retVal = new Sink( + retVal = tuningConfig.getSinkFactory().create( identifier.getInterval(), schema, identifier.getShardSpec(), @@ -805,7 +805,7 @@ public int compare(File o1, File o2) throw new ISE("Missing hydrant [%,d] in sinkDir [%s].", hydrants.size(), sinkDir); } - Sink currSink = new Sink( + Sink currSink = tuningConfig.getSinkFactory().create( identifier.getInterval(), schema, identifier.getShardSpec(), diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/DefaultSinkFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/DefaultSinkFactory.java new file mode 100644 index 000000000000..b7a99f9efb65 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/DefaultSinkFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.realtime.FireHydrant; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +import java.util.List; + +public class DefaultSinkFactory implements SinkFactory +{ + @Override + public Sink create( + Interval interval, + DataSchema schema, + ShardSpec shardSpec, + String version, + int maxRowsInMemory, + boolean reportParseExceptions + ) + { + return new Sink( + interval, + schema, + shardSpec, + version, + maxRowsInMemory, + reportParseExceptions + ); + } + + @Override + public Sink create( + Interval interval, + DataSchema schema, + ShardSpec shardSpec, + String version, + int maxRowsInMemory, + boolean reportParseExceptions, + List hydrants + ) + { + return new Sink( + interval, + schema, + shardSpec, + version, + maxRowsInMemory, + reportParseExceptions, + hydrants + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 1e1b3cb18092..477ad8f3faa5 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -239,7 +239,7 @@ private Sink getSink(long timestamp) segmentGranularity.increment(new DateTime(truncatedTime)) ); - retVal = new Sink( + retVal = config.getSinkFactory().create( sinkInterval, schema, config.getShardSpec(), @@ -700,7 +700,7 @@ public int compare(File o1, File o2) ); continue; } - final Sink currSink = new Sink( + final Sink currSink = config.getSinkFactory().create( sinkInterval, schema, config.getShardSpec(), diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/SinkFactory.java b/server/src/main/java/io/druid/segment/realtime/plumber/SinkFactory.java new file mode 100644 index 000000000000..d537a9c59aab --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/plumber/SinkFactory.java @@ -0,0 +1,55 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.plumber; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.realtime.FireHydrant; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.Interval; + +import java.util.List; + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = DefaultSinkFactory.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "default", value = DefaultSinkFactory.class) +}) +public interface SinkFactory +{ + public Sink create( + Interval interval, + DataSchema schema, + ShardSpec shardSpec, + String version, + int maxRowsInMemory, + boolean reportParseExceptions + ); + + public Sink create( + Interval interval, + DataSchema schema, + ShardSpec shardSpec, + String version, + int maxRowsInMemory, + boolean reportParseExceptions, + List hydrants + ); +} diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 85e60f3ba616..c25e8b04d2bc 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -68,9 +68,11 @@ import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.plumber.DefaultSinkFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; +import io.druid.segment.realtime.plumber.SinkFactory; import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.partition.LinearShardSpec; import io.druid.utils.Runnables; @@ -83,11 +85,14 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -96,6 +101,7 @@ /** */ +@RunWith(Parameterized.class) public class RealtimeManagerTest { private static QueryRunnerFactory factory; @@ -108,6 +114,7 @@ public class RealtimeManagerTest makeRow(new DateTime().getMillis()) ); + private SinkFactory sinkFactory; private RealtimeManager realtimeManager; private RealtimeManager realtimeManager2; private RealtimeManager realtimeManager3; @@ -133,6 +140,19 @@ public > QueryRunnerFactory findFact }; } + @Parameterized.Parameters(name = "sinkFactory = {0}") + public static Collection constructorFeeder() throws IOException + { + final List constructors = Lists.newArrayList(); + constructors.add(new Object[]{new DefaultSinkFactory()}); + return constructors; + } + + public RealtimeManagerTest(SinkFactory sinkFactory) + { + this.sinkFactory = sinkFactory; + } + @Before public void setUp() throws Exception { @@ -209,9 +229,10 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null + null, + sinkFactory ); - plumber = new TestPlumber(new Sink( + plumber = new TestPlumber(tuningConfig.getSinkFactory().create( new Interval("0/P5000Y"), schema, tuningConfig.getShardSpec(), @@ -231,7 +252,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException null, EasyMock.createNiceMock(DataSegmentServerAnnouncer.class) ); - plumber2 = new TestPlumber(new Sink( + plumber2 = new TestPlumber(tuningConfig.getSinkFactory().create( new Interval("0/P5000Y"), schema2, tuningConfig.getShardSpec(), @@ -267,7 +288,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null + null, + sinkFactory ); tuningConfig_1 = new RealtimeTuningConfig( @@ -285,7 +307,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException 0, null, null, - null + null, + sinkFactory ); schema3 = new DataSchema( diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 16484c48cdae..667d20374849 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -81,6 +81,7 @@ EasyMock. anyObject(), 0, false, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 71fd3714d23c..4f52a8e46f5d 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -137,6 +137,7 @@ public AppenderatorTester( 0, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index bd7c2b418cd7..3528ad5f5461 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -144,6 +144,7 @@ public int columnCacheSizeBytes() 0, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 2c50bcddb406..eef737c3a304 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -82,6 +82,7 @@ public class RealtimePlumberSchoolTest { private final RejectionPolicyFactory rejectionPolicy; + private final SinkFactory sinkFactory; private RealtimePlumber plumber; private RealtimePlumberSchool realtimePlumberSchool; private DataSegmentAnnouncer announcer; @@ -96,9 +97,10 @@ public class RealtimePlumberSchoolTest private FireDepartmentMetrics metrics; private File tmpDir; - public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) + public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy, SinkFactory sinkFactory) { this.rejectionPolicy = rejectionPolicy; + this.sinkFactory = sinkFactory; } @Parameterized.Parameters(name = "rejectionPolicy = {0}") @@ -111,7 +113,7 @@ public static Collection constructorFeeder() throws IOException final List constructors = Lists.newArrayList(); for (RejectionPolicyFactory rejectionPolicy : rejectionPolicies) { - constructors.add(new Object[]{rejectionPolicy}); + constructors.add(new Object[]{rejectionPolicy, new DefaultSinkFactory()}); } return constructors; } @@ -199,7 +201,8 @@ public void setUp() throws Exception 0, false, null, - null + null, + sinkFactory ); realtimePlumberSchool = new RealtimePlumberSchool( @@ -255,7 +258,7 @@ private void testPersist(final Object commitMetadata) throws Exception plumber.getSinks() .put( 0L, - new Sink( + tuningConfig.getSinkFactory().create( new Interval(0, TimeUnit.HOURS.toMillis(1)), schema, tuningConfig.getShardSpec(), @@ -302,7 +305,7 @@ public void testPersistFails() throws Exception plumber.getSinks() .put( 0L, - new Sink( + tuningConfig.getSinkFactory().create( new Interval(0, TimeUnit.HOURS.toMillis(1)), schema, tuningConfig.getShardSpec(), @@ -359,7 +362,7 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex plumber2.getSinks() .put( 0L, - new Sink( + tuningConfig.getSinkFactory().create( testInterval, schema2, tuningConfig.getShardSpec(), diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index d6bbd1ab587a..0ea274b7c685 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -36,13 +36,33 @@ import org.joda.time.Period; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import java.io.IOException; +import java.util.Collection; import java.util.List; /** */ +@RunWith(Parameterized.class) public class SinkTest { + private SinkFactory sinkFactory; + + @Parameterized.Parameters(name = "sinkFactory = {0}") + public static Collection constructorFeeder() throws IOException + { + final List constructors = Lists.newArrayList(); + constructors.add(new Object[]{new DefaultSinkFactory()}); + return constructors; + } + + public SinkTest(SinkFactory sinkFactory) + { + this.sinkFactory = sinkFactory; + } + @Test public void testSwap() throws Exception { @@ -71,9 +91,10 @@ public void testSwap() throws Exception 0, null, null, - null + null, + sinkFactory ); - final Sink sink = new Sink( + final Sink sink = tuningConfig.getSinkFactory().create( interval, schema, tuningConfig.getShardSpec(), diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index b83a93bdbe9d..4ea9acde55f2 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -181,6 +181,7 @@ public Plumber findPlumber( 0, true, null, + null, null ) ), From f8896c3309f55cdecf1fe19142ebf69f34466486 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 27 Jun 2017 11:39:12 +0800 Subject: [PATCH 2/3] introduce customIndexMerger in tunningConfig --- .../indexing/kafka/KafkaTuningConfig.java | 31 ++++++++++++++++--- .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 6 +++- .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../indexing/kafka/KafkaTuningConfigTest.java | 1 + .../kafka/supervisor/KafkaSupervisorTest.java | 1 + .../common/index/YeOldePlumberSchool.java | 9 ++++-- .../druid/indexing/common/task/IndexTask.java | 27 ++++++++++++---- .../druid/indexing/common/task/MergeTask.java | 5 ++- .../common/task/SameIntervalMergeTask.java | 7 +++++ .../indexing/common/task/IndexTaskTest.java | 1 + .../common/task/RealtimeIndexTaskTest.java | 1 + .../task/SameIntervalMergeTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 7 +++-- .../indexing/overlord/TaskLifecycleTest.java | 7 +++-- .../java/io/druid/segment/IndexMerger.java | 6 ++++ .../java/io/druid/segment/IndexMergerV9.java | 7 +++-- .../indexing/RealtimeTuningConfig.java | 22 ++++++++++--- .../appenderator/AppenderatorConfig.java | 3 ++ .../segment/realtime/RealtimeManagerTest.java | 9 ++++-- .../appenderator/AppenderatorPlumberTest.java | 1 + .../appenderator/AppenderatorTester.java | 1 + ...DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 3 +- .../segment/realtime/plumber/SinkTest.java | 3 +- .../cli/validate/DruidJsonValidatorTest.java | 1 + 26 files changed, 132 insertions(+), 31 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 4f28433ebc6e..980536c8e1cb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.TuningConfig; @@ -46,6 +47,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; private final SinkFactory sinkFactory; + private final IndexMerger customIndexMerger; @JsonCreator public KafkaTuningConfig( @@ -60,7 +62,8 @@ public KafkaTuningConfig( @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, - @JsonProperty("sinkFactory") SinkFactory sinkFactory + @JsonProperty("sinkFactory") SinkFactory sinkFactory, + @JsonProperty("customIndexMerger") IndexMerger customIndexMerger ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -84,6 +87,7 @@ public KafkaTuningConfig( ? DEFAULT_RESET_OFFSET_AUTOMATICALLY : resetOffsetAutomatically; this.sinkFactory = sinkFactory == null ? defaults.getSinkFactory() : sinkFactory; + this.customIndexMerger = customIndexMerger; } public static KafkaTuningConfig copyOf(KafkaTuningConfig config) @@ -99,7 +103,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config) config.reportParseExceptions, config.handoffConditionTimeout, config.resetOffsetAutomatically, - config.sinkFactory + config.sinkFactory, + config.customIndexMerger ); } @@ -137,6 +142,13 @@ public SinkFactory getSinkFactory() return sinkFactory; } + @Override + @JsonProperty + public IndexMerger getCustomIndexMerger() + { + return customIndexMerger; + } + @Override @JsonProperty public int getMaxPendingPersists() @@ -194,7 +206,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir) reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - sinkFactory + sinkFactory, + customIndexMerger ); } @@ -211,7 +224,8 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows) reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, - sinkFactory + sinkFactory, + customIndexMerger ); } @@ -258,7 +272,12 @@ public boolean equals(Object o) if (indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null) { return false; } - return sinkFactory != null ? sinkFactory.equals(that.sinkFactory) : that.sinkFactory == null; + if (sinkFactory != null ? !sinkFactory.equals(that.sinkFactory) : that.sinkFactory != null) { + return false; + } + return customIndexMerger != null + ? customIndexMerger.equals(that.customIndexMerger) + : that.customIndexMerger == null; } @Override @@ -274,6 +293,7 @@ public int hashCode() result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32)); result = 31 * result + (resetOffsetAutomatically ? 1 : 0); result = 31 * result + (sinkFactory != null ? sinkFactory.hashCode() : 0); + result = 31 * result + (customIndexMerger != null ? customIndexMerger.hashCode() : 0); return result; } @@ -291,6 +311,7 @@ public String toString() ", handoffConditionTimeout=" + handoffConditionTimeout + ", resetOffsetAutomatically=" + resetOffsetAutomatically + ", sinkFactory=" + sinkFactory + + ", customIndexMerger=" + customIndexMerger + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index a8a1b7556a48..b3ae9ca16272 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -89,6 +89,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index afbd8aac065f..73c0d9edd535 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.indexing.kafka.KafkaTuningConfig; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.plumber.SinkFactory; import org.joda.time.Duration; @@ -50,6 +51,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, // for backward compatibility @JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically, @JsonProperty("sinkFactory") SinkFactory sinkFactory, + @JsonProperty("customIndexMerger") IndexMerger customIndexMerger, @JsonProperty("workerThreads") Integer workerThreads, @JsonProperty("chatThreads") Integer chatThreads, @JsonProperty("chatRetries") Long chatRetries, @@ -71,7 +73,8 @@ public KafkaSupervisorTuningConfig( // handoffConditionTimeout handoffConditionTimeout, resetOffsetAutomatically, - sinkFactory + sinkFactory, + customIndexMerger ); this.workerThreads = workerThreads; @@ -132,6 +135,7 @@ public String toString() ", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + ", sinkFactory=" + getSinkFactory() + + ", customIndexMerger=" + getCustomIndexMerger() + ", workerThreads=" + workerThreads + ", chatThreads=" + chatThreads + ", chatRetries=" + chatRetries + diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 71e7c0b66f97..3f929f64f3ba 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1390,6 +1390,7 @@ private KafkaIndexTask createTask( reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, + null, null ); final KafkaIndexTask task = new KafkaIndexTask( diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index caa4be08b39b..e89ee8dfff46 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -112,6 +112,7 @@ public void testCopyOf() throws Exception true, 5L, null, + null, null ); KafkaTuningConfig copy = KafkaTuningConfig.copyOf(original); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 626a6c78e4cf..77e18c9a7d0f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -193,6 +193,7 @@ public void setupTest() throws Exception null, null, null, + null, numThreads, TEST_CHAT_THREADS, TEST_CHAT_RETRIES, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index ed5128b4044e..85d5ae9484bb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -36,6 +36,7 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.segment.IndexIO; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; import io.druid.segment.QueryableIndex; import io.druid.segment.SegmentUtils; @@ -113,6 +114,10 @@ public Plumber findPlumber( // Set of spilled segments. Will be merged at the end. final Set spilled = Sets.newHashSet(); + // IndexMerger implementation. + final IndexMerger theIndexMerger = config.getCustomIndexMerger() != null + ? config.getCustomIndexMerger() : indexMergerV9; + return new Plumber() { @Override @@ -181,7 +186,7 @@ public void finishJob() } fileToUpload = new File(tmpSegmentDir, "merged"); - indexMergerV9.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec()); + theIndexMerger.mergeQueryableIndex(indexes, schema.getGranularitySpec().isRollup(), schema.getAggregators(), fileToUpload, config.getIndexSpec()); } // Map merged segment so we can extract dimensions @@ -226,7 +231,7 @@ private void spillIfSwappable() log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist); try { - indexMergerV9.persist( + theIndexMerger.persist( indexToPersist.getIndex(), dirToPersist, config.getIndexSpec() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index e52506bad1a5..a69edf6b0709 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -63,6 +63,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.parsers.ParseException; import io.druid.query.DruidMetrics; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.IOConfig; @@ -555,7 +556,8 @@ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox.getSegmentPusher(), toolbox.getObjectMapper(), toolbox.getIndexIO(), - toolbox.getIndexMergerV9() + ingestionSchema.getTuningConfig().getCustomIndexMerger() != null + ? ingestionSchema.getTuningConfig().getCustomIndexMerger() : toolbox.getIndexMergerV9() ); } @@ -595,7 +597,7 @@ public IndexIngestionSpec( this.ioConfig = ioConfig; this.tuningConfig = tuningConfig == null ? - new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null, null) + new IndexTuningConfig(null, null, null, null, null, null, null, null, (File) null, null, null) : tuningConfig; } @@ -676,6 +678,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final boolean reportParseExceptions; private final long publishTimeout; private final SinkFactory sinkFactory; + private final IndexMerger customIndexMerger; @JsonCreator public IndexTuningConfig( @@ -690,7 +693,8 @@ public IndexTuningConfig( @JsonProperty("forceExtendableShardSpecs") @Nullable Boolean forceExtendableShardSpecs, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @JsonProperty("publishTimeout") @Nullable Long publishTimeout, - @JsonProperty("sinkFactory") @Nullable SinkFactory sinkFactory + @JsonProperty("sinkFactory") @Nullable SinkFactory sinkFactory, + @JsonProperty("customIndexMerger") @Nullable IndexMerger customIndexMerger ) { this( @@ -703,7 +707,8 @@ public IndexTuningConfig( reportParseExceptions, publishTimeout, null, - sinkFactory + sinkFactory, + customIndexMerger ); } @@ -717,7 +722,8 @@ private IndexTuningConfig( @Nullable Boolean reportParseExceptions, @Nullable Long publishTimeout, @Nullable File basePersistDirectory, - @Nullable SinkFactory sinkFactory + @Nullable SinkFactory sinkFactory, + @Nullable IndexMerger customIndexMerger ) { Preconditions.checkArgument( @@ -743,6 +749,7 @@ private IndexTuningConfig( this.publishTimeout = publishTimeout == null ? DEFAULT_PUBLISH_TIMEOUT : publishTimeout; this.basePersistDirectory = basePersistDirectory; this.sinkFactory = sinkFactory == null ? DEFAULT_SINK_FACTORY : sinkFactory; + this.customIndexMerger = customIndexMerger; } public IndexTuningConfig withBasePersistDirectory(File dir) @@ -757,7 +764,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir) reportParseExceptions, publishTimeout, dir, - sinkFactory + sinkFactory, + customIndexMerger ); } @@ -800,6 +808,13 @@ public SinkFactory getSinkFactory() return sinkFactory; } + @JsonProperty + @Override + public IndexMerger getCustomIndexMerger() + { + return customIndexMerger; + } + @JsonProperty @Override public int getMaxPendingPersists() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java index d897df5c089f..d5c8a63d50b5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTask.java @@ -47,6 +47,7 @@ public class MergeTask extends MergeTaskBase private final List aggregators; private final Boolean rollup; private final IndexSpec indexSpec; + private final IndexMerger customIndexMerger; @JsonCreator public MergeTask( @@ -56,6 +57,7 @@ public MergeTask( @JsonProperty("aggregations") List aggregators, @JsonProperty("rollup") Boolean rollup, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("customIndexMerger") IndexMerger customIndexMerger, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("context") Map context @@ -65,13 +67,14 @@ public MergeTask( this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations"); this.rollup = rollup == null ? Boolean.TRUE : rollup; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.customIndexMerger = customIndexMerger; } @Override public File merge(final TaskToolbox toolbox, final Map segments, final File outDir) throws Exception { - IndexMerger indexMerger = toolbox.getIndexMergerV9(); + IndexMerger indexMerger = customIndexMerger != null ? customIndexMerger : toolbox.getIndexMergerV9(); return indexMerger.mergeQueryableIndex( Lists.transform( ImmutableList.copyOf(segments.values()), diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java index 3a112e83ebb6..583c1669ffa2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/SameIntervalMergeTask.java @@ -26,6 +26,7 @@ import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.SegmentListUsedAction; import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.timeline.DataSegment; import org.joda.time.DateTime; @@ -43,6 +44,7 @@ public class SameIntervalMergeTask extends AbstractFixedIntervalTask private final List aggregators; private final Boolean rollup; private final IndexSpec indexSpec; + private final IndexMerger customIndexMerger; public SameIntervalMergeTask( @JsonProperty("id") String id, @@ -51,6 +53,7 @@ public SameIntervalMergeTask( @JsonProperty("aggregations") List aggregators, @JsonProperty("rollup") Boolean rollup, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("customIndexMerger") IndexMerger customIndexMerger, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("context") Map context @@ -65,6 +68,7 @@ public SameIntervalMergeTask( this.aggregators = Preconditions.checkNotNull(aggregators, "null aggregations"); this.rollup = rollup == null ? Boolean.TRUE : rollup; this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec; + this.customIndexMerger = customIndexMerger; } @JsonProperty("aggregations") @@ -129,6 +133,7 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception aggregators, rollup, indexSpec, + customIndexMerger, getContext() ); final TaskStatus status = mergeTask.run(toolbox); @@ -147,6 +152,7 @@ public SubTask( List aggregators, Boolean rollup, IndexSpec indexSpec, + IndexMerger customIndexMerger, Map context ) { @@ -157,6 +163,7 @@ public SubTask( aggregators, rollup, indexSpec, + customIndexMerger, true, context ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 26cd21a253ad..80eea4db2810 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -750,6 +750,7 @@ private IndexTask.IndexIngestionSpec createIngestionSpec( forceExtendableShardSpecs, reportParseException, null, + null, null ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 16ae03f34368..6ea36807691a 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -893,6 +893,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa reportParseExceptions, handoffTimeout, null, + null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 790349eb01ea..77d38cefd0ed 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -89,6 +89,7 @@ public void testRun() throws Exception aggregators, true, indexSpec, + null, true, null ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 6ddd7c636131..ceaace2dfa7d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -185,7 +185,7 @@ public void testIndexTaskSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null, null) + new IndexTask.IndexTuningConfig(10000, 10, 9999, null, indexSpec, 3, true, true, true, null, null, null) ), null, jsonMapper @@ -248,7 +248,7 @@ public void testIndexTaskwithResourceSerde() throws Exception jsonMapper ), new IndexTask.IndexIOConfig(new LocalFirehoseFactory(new File("lol"), "rofl", null), true), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null, null) ), null, jsonMapper @@ -294,6 +294,7 @@ public void testMergeTaskSerde() throws Exception aggregators, true, indexSpec, + null, true, null ); @@ -343,6 +344,7 @@ public void testSameIntervalMergeTaskSerde() throws Exception aggregators, true, indexSpec, + null, true, null ); @@ -500,6 +502,7 @@ public Plumber findPlumber( true, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 3fc9d01b406e..9b0293c40f62 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -652,7 +652,7 @@ public void testIndexTask() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null, null) ), null, MAPPER @@ -710,7 +710,7 @@ public void testIndexTaskFailure() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockExceptionalFirehoseFactory(), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, 3, true, true, true, null, null, null) ), null, MAPPER @@ -1075,7 +1075,7 @@ public void testResumeTasks() throws Exception mapper ), new IndexTask.IndexIOConfig(new MockFirehoseFactory(false), false), - new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null, null) + new IndexTask.IndexTuningConfig(10000, 10, null, null, indexSpec, null, false, null, null, null, null, null) ), null, MAPPER @@ -1198,6 +1198,7 @@ private RealtimeIndexTask newRealtimeIndexTask() null, null, null, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 88b33b381be6..52c2e6097401 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -19,6 +19,8 @@ package io.druid.segment; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.google.common.base.Function; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; @@ -58,6 +60,10 @@ import java.util.Set; import java.util.stream.Collectors; +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = IndexMergerV9.class) +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "v9", value = IndexMergerV9.class) +}) @ImplementedBy(IndexMergerV9.class) public interface IndexMerger { diff --git a/processing/src/main/java/io/druid/segment/IndexMergerV9.java b/processing/src/main/java/io/druid/segment/IndexMergerV9.java index ec340e2ba629..73396e546147 100644 --- a/processing/src/main/java/io/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/io/druid/segment/IndexMergerV9.java @@ -19,6 +19,8 @@ package io.druid.segment; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.base.Preconditions; @@ -93,9 +95,10 @@ public class IndexMergerV9 implements IndexMerger protected final IndexIO indexIO; @Inject + @JsonCreator public IndexMergerV9( - ObjectMapper mapper, - IndexIO indexIO + @JacksonInject ObjectMapper mapper, + @JacksonInject IndexIO indexIO ) { this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper"); diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index c671b5128926..89456592cb00 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.io.Files; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.appenderator.AppenderatorConfig; import io.druid.segment.realtime.plumber.DefaultSinkFactory; @@ -78,7 +79,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis defaultReportParseExceptions, defaultHandoffConditionTimeout, defaultAlertTimeout, - defaultSinkFactory + defaultSinkFactory, + null ); } @@ -97,6 +99,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final File basePersis private final long handoffConditionTimeout; private final long alertTimeout; private final SinkFactory sinkFactory; + private final IndexMerger customIndexMerger; @JsonCreator public RealtimeTuningConfig( @@ -116,7 +119,8 @@ public RealtimeTuningConfig( @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("alertTimeout") Long alertTimeout, - @JsonProperty("sinkFactory") SinkFactory sinkFactory + @JsonProperty("sinkFactory") SinkFactory sinkFactory, + @JsonProperty("customIndexMerger") IndexMerger customIndexMerger ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; @@ -145,6 +149,7 @@ public RealtimeTuningConfig( this.alertTimeout = alertTimeout == null ? defaultAlertTimeout : alertTimeout; Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); this.sinkFactory = sinkFactory == null ? defaultSinkFactory : sinkFactory; + this.customIndexMerger = customIndexMerger; } @Override @@ -254,6 +259,13 @@ public SinkFactory getSinkFactory() return sinkFactory; } + @Override + @JsonProperty + public IndexMerger getCustomIndexMerger() + { + return customIndexMerger; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -272,7 +284,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) reportParseExceptions, handoffConditionTimeout, alertTimeout, - sinkFactory + sinkFactory, + customIndexMerger ); } @@ -294,7 +307,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) reportParseExceptions, handoffConditionTimeout, alertTimeout, - sinkFactory + sinkFactory, + customIndexMerger ); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java index aa813daae422..25911a022bc4 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.appenderator; +import io.druid.segment.IndexMerger; import io.druid.segment.IndexSpec; import io.druid.segment.realtime.plumber.SinkFactory; import org.joda.time.Period; @@ -40,4 +41,6 @@ public interface AppenderatorConfig File getBasePersistDirectory(); SinkFactory getSinkFactory(); + + IndexMerger getCustomIndexMerger(); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index c25e8b04d2bc..d790362855e6 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -230,7 +230,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException null, null, null, - sinkFactory + sinkFactory, + null ); plumber = new TestPlumber(tuningConfig.getSinkFactory().create( new Interval("0/P5000Y"), @@ -289,7 +290,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException null, null, null, - sinkFactory + sinkFactory, + null ); tuningConfig_1 = new RealtimeTuningConfig( @@ -308,7 +310,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException null, null, null, - sinkFactory + sinkFactory, + null ); schema3 = new DataSchema( diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 667d20374849..b82707e18647 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -82,6 +82,7 @@ EasyMock. anyObject(), false, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 4f52a8e46f5d..2c7c97d6a9af 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -138,6 +138,7 @@ public AppenderatorTester( null, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 3528ad5f5461..29c261971999 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -145,6 +145,7 @@ public int columnCacheSizeBytes() null, null, null, + null, null ); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index eef737c3a304..e55d8f611e4b 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -202,7 +202,8 @@ public void setUp() throws Exception false, null, null, - sinkFactory + sinkFactory, + null ); realtimePlumberSchool = new RealtimePlumberSchool( diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index 0ea274b7c685..e6d51e706b8b 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -92,7 +92,8 @@ public void testSwap() throws Exception null, null, null, - sinkFactory + sinkFactory, + null ); final Sink sink = tuningConfig.getSinkFactory().create( interval, diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 4ea9acde55f2..14b2b2151182 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -182,6 +182,7 @@ public Plumber findPlumber( true, null, null, + null, null ) ), From e3256fc6fd57a14ece4314ab6fc29ffa21a80365 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Thu, 29 Jun 2017 02:47:12 +0800 Subject: [PATCH 3/3] ability to load customized segment format --- .../IngestSegmentFirehoseFactoryTest.java | 11 +++ .../common/io/smoosh/SmooshedFileMapper.java | 2 +- .../main/java/io/druid/segment/IndexIO.java | 53 +++++++++++--- .../segment/StringDimensionMergerV9.java | 72 +++++++++++++++---- .../MMappedQueryableSegmentizerFactory.java | 9 ++- .../segment/loading/SegmentizerFactory.java | 4 ++ .../aggregation/AggregationTestHelper.java | 2 +- .../java/io/druid/segment/TestHelper.java | 25 +++---- .../SegmentLoaderLocalCacheManager.java | 21 ++---- .../appenderator/AppenderatorTester.java | 6 ++ 10 files changed, 150 insertions(+), 55 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 1341e2f07062..74491285c40e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -62,6 +62,7 @@ import io.druid.segment.IndexIO; import io.druid.segment.IndexMergerV9; import io.druid.segment.IndexSpec; +import io.druid.segment.column.ColumnConfig; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.loading.DataSegmentArchiver; @@ -373,6 +374,16 @@ public static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMap public void configure(Binder binder) { binder.bind(LocalDataSegmentPuller.class); + binder.bind(ColumnConfig.class).toInstance( + new ColumnConfig() + { + @Override + public int columnCacheSizeBytes() + { + return 0; + } + } + ); } } ) diff --git a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java index 1aee6aa1742e..7a8e6dbec511 100644 --- a/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java +++ b/java-util/src/main/java/io/druid/java/util/common/io/smoosh/SmooshedFileMapper.java @@ -97,7 +97,7 @@ public static SmooshedFileMapper load(File baseDir) throws IOException private final Map internalFiles; private final List buffersList = Lists.newArrayList(); - private SmooshedFileMapper( + public SmooshedFileMapper( List outFiles, Map internalFiles ) diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 0ea368d945a0..829011106a3a 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -58,6 +58,8 @@ import io.druid.segment.data.IndexedMultivalue; import io.druid.segment.data.IndexedRTree; import io.druid.segment.data.VSizeIndexed; +import io.druid.segment.loading.MMappedQueryableSegmentizerFactory; +import io.druid.segment.loading.SegmentizerFactory; import io.druid.segment.serde.BitmapIndexColumnPartSupplier; import io.druid.segment.serde.ComplexColumnPartSupplier; import io.druid.segment.serde.DictionaryEncodedColumnSupplier; @@ -91,12 +93,13 @@ public class IndexIO private static final SerializerUtils serializerUtils = new SerializerUtils(); private final ObjectMapper mapper; + private final ColumnConfig columnConfig; @Inject public IndexIO(ObjectMapper mapper, ColumnConfig columnConfig) { this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper"); - Preconditions.checkNotNull(columnConfig, "null ColumnConfig"); + this.columnConfig = Preconditions.checkNotNull(columnConfig, "null ColumnConfig"); ImmutableMap.Builder indexLoadersBuilder = ImmutableMap.builder(); LegacyIndexLoader legacyIndexLoader = new LegacyIndexLoader(new DefaultIndexIOHandler(), columnConfig); for (int i = 0; i <= V8_VERSION; i++) { @@ -106,6 +109,11 @@ public IndexIO(ObjectMapper mapper, ColumnConfig columnConfig) indexLoaders = indexLoadersBuilder.build(); } + public ColumnConfig getColumnConfig() + { + return columnConfig; + } + public void validateTwoSegments(File dir1, File dir2) throws IOException { try (QueryableIndex queryableIndex1 = loadIndex(dir1)) { @@ -180,6 +188,11 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable } public QueryableIndex loadIndex(File inDir) throws IOException + { + return getSegmentizerFactory(inDir).loadIndex(inDir); + } + + public QueryableIndex loadIndexDirectly(File inDir) throws IOException { final int version = SegmentUtils.getVersionFromDir(inDir); @@ -192,6 +205,25 @@ public QueryableIndex loadIndex(File inDir) throws IOException } } + public SegmentizerFactory getSegmentizerFactory(File segmentDir) throws IOException + { + File factoryJson = new File(segmentDir, "factory.json"); + final SegmentizerFactory factory; + + if (factoryJson.exists()) { + factory = mapper.readValue(factoryJson, SegmentizerFactory.class); + } else { + factory = new MMappedQueryableSegmentizerFactory(this); + } + + return factory; + } + + public IndexLoader getIndexLoader(int version) throws IOException + { + return indexLoaders.get(version); + } + public static int getVersionFromDir(File inDir) throws IOException { File versionFile = new File(inDir, "version.bin"); @@ -410,7 +442,7 @@ public MMappedIndex mapDir(File inDir) throws IOException } - static interface IndexLoader + public static interface IndexLoader { public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException; } @@ -519,7 +551,7 @@ Column.TIME_COLUMN_NAME, new ColumnBuilder() } } - static class V9IndexLoader implements IndexLoader + public static class V9IndexLoader implements IndexLoader { private final ColumnConfig columnConfig; @@ -539,8 +571,15 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException throw new IllegalArgumentException(String.format("Expected version[9], got[%s]", theVersion)); } - SmooshedFileMapper smooshedFiles = Smoosh.map(inDir); + final QueryableIndex index = load(Smoosh.map(inDir), inDir, mapper); + + log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); + + return index; + } + public QueryableIndex load(SmooshedFileMapper smooshedFiles, File inDir, ObjectMapper mapper) throws IOException + { ByteBuffer indexBuffer = smooshedFiles.mapFile("index.drd"); /** * Index.drd should consist of the segment version, the columns and dimensions of the segment as generic @@ -597,13 +636,9 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles)); - final QueryableIndex index = new SimpleQueryableIndex( + return new SimpleQueryableIndex( dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, metadata ); - - log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); - - return index; } private Column deserializeColumn(ObjectMapper mapper, ByteBuffer byteBuffer, SmooshedFileMapper smooshedFiles) diff --git a/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java b/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java index 2f8e23d6895a..f225e0fbeee7 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionMergerV9.java @@ -19,6 +19,7 @@ package io.druid.segment; +import com.google.common.base.Function; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -90,6 +91,7 @@ public class StringDimensionMergerV9 implements DimensionMergerV9 protected ColumnCapabilities capabilities; protected final File outDir; protected List adapters; + protected List> dimValuesList; protected ProgressIndicator progress; protected final IndexSpec indexSpec; @@ -120,17 +122,19 @@ public void writeMergedValueMetadata(List adapters) throws IOE long dimStartTime = System.currentTimeMillis(); this.adapters = adapters; + this.dimValuesList = toDimValuesList(adapters); + int adaptersCount = dimValuesList.size(); - dimConversions = Lists.newArrayListWithCapacity(adapters.size()); - for (int i = 0; i < adapters.size(); ++i) { + dimConversions = Lists.newArrayListWithCapacity(adaptersCount); + for (int i = 0; i < dimValuesList.size(); ++i) { dimConversions.add(null); } int numMergeIndex = 0; Indexed dimValueLookup = null; - Indexed[] dimValueLookups = new Indexed[adapters.size() + 1]; - for (int i = 0; i < adapters.size(); i++) { - Indexed dimValues = (Indexed) adapters.get(i).getDimValueLookup(dimensionName); + Indexed[] dimValueLookups = new Indexed[adaptersCount + 1]; + for (int i = 0; i < adaptersCount; i++) { + Indexed dimValues = dimValuesList.get(i); if (!isNullColumn(dimValues)) { dimHasValues = true; hasNull |= dimValues.indexOf(null) >= 0; @@ -152,7 +156,7 @@ public void writeMergedValueMetadata(List adapters) throws IOE */ if (convertMissingValues && !hasNull) { hasNull = true; - dimValueLookups[adapters.size()] = dimValueLookup = EMPTY_STR_DIM_VAL; + dimValueLookups[adaptersCount] = dimValueLookup = EMPTY_STR_DIM_VAL; numMergeIndex++; } @@ -172,7 +176,7 @@ public void writeMergedValueMetadata(List adapters) throws IOE dictionaryWriter.write(iterator.next()); } - for (int i = 0; i < adapters.size(); i++) { + for (int i = 0; i < adaptersCount; i++) { if (dimValueLookups[i] != null && iterator.needConversion(i)) { dimConversions.set(i, iterator.conversions[i]); } @@ -196,6 +200,20 @@ public void writeMergedValueMetadata(List adapters) throws IOE setupEncodedValueWriter(); } + protected List> toDimValuesList(List adapters) throws IOException + { + return Lists.newArrayList( + Lists.transform(adapters, new Function>() + { + @Override + public Indexed apply(IndexableAdapter input) + { + return input.getDimValueLookup(dimensionName); + } + }) + ); + } + protected void setupEncodedValueWriter() throws IOException { final CompressedObjectStrategy.CompressionStrategy compressionStrategy = indexSpec.getDimensionCompression(); @@ -316,12 +334,12 @@ public void close() tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50, bmpFactory), bmpFactory); } - IndexSeeker[] dictIdSeeker = toIndexSeekers(adapters, dimConversions, dimensionName); + IndexSeeker[] dictIdSeeker = toIndexSeekers(dimValuesList, dimConversions, dimensionName); //Iterate all dim values's dictionary id in ascending order which in line with dim values's compare result. for (int dictId = 0; dictId < dimVals.size(); dictId++) { progress.progress(); - mergeBitmaps( + mergeBitmapsInternal( segmentRowNumConversions, dimVals, bmpFactory, @@ -351,6 +369,34 @@ public void close() } } + protected void mergeBitmapsInternal( + List segmentRowNumConversions, + Indexed dimVals, + BitmapFactory bmpFactory, + RTree tree, + boolean hasSpatial, + IndexSeeker[] dictIdSeeker, + int dictId, + List adapters, + String dimensionName, + MutableBitmap nullRowsBitmap, + GenericIndexedWriter bitmapWriter) throws IOException + { + mergeBitmaps( + segmentRowNumConversions, + dimVals, + bmpFactory, + tree, + hasSpatial, + dictIdSeeker, + dictId, + adapters, + dimensionName, + nullRowsBitmap, + bitmapWriter + ); + } + static void mergeBitmaps( List segmentRowNumConversions, Indexed dimVals, @@ -585,18 +631,18 @@ public int skip(int n) } protected IndexSeeker[] toIndexSeekers( - List adapters, + List> dimValuesList, ArrayList dimConversions, String dimension ) { - IndexSeeker[] seekers = new IndexSeeker[adapters.size()]; - for (int i = 0; i < adapters.size(); i++) { + IndexSeeker[] seekers = new IndexSeeker[dimValuesList.size()]; + for (int i = 0; i < dimValuesList.size(); i++) { IntBuffer dimConversion = dimConversions.get(i); if (dimConversion != null) { seekers[i] = new IndexSeekerWithConversion((IntBuffer) dimConversion.asReadOnlyBuffer().rewind()); } else { - Indexed dimValueLookup = (Indexed) adapters.get(i).getDimValueLookup(dimension); + Indexed dimValueLookup = dimValuesList.get(i); seekers[i] = new IndexSeekerWithoutConversion(dimValueLookup == null ? 0 : dimValueLookup.size()); } } diff --git a/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java b/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java index 23c9007c9ab0..7834c14fd08b 100644 --- a/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java +++ b/processing/src/main/java/io/druid/segment/loading/MMappedQueryableSegmentizerFactory.java @@ -23,6 +23,7 @@ import com.google.common.base.Preconditions; import io.druid.java.util.common.logger.Logger; import io.druid.segment.IndexIO; +import io.druid.segment.QueryableIndex; import io.druid.segment.QueryableIndexSegment; import io.druid.segment.Segment; import io.druid.timeline.DataSegment; @@ -47,10 +48,16 @@ public MMappedQueryableSegmentizerFactory(@JacksonInject IndexIO indexIO) public Segment factorize(DataSegment dataSegment, File parentDir) throws SegmentLoadingException { try { - return new QueryableIndexSegment(dataSegment.getIdentifier(), indexIO.loadIndex(parentDir)); + return new QueryableIndexSegment(dataSegment.getIdentifier(), loadIndex(parentDir)); } catch (IOException e) { throw new SegmentLoadingException(e, "%s", e.getMessage()); } } + + @Override + public QueryableIndex loadIndex(File parentDir) throws IOException + { + return indexIO.loadIndexDirectly(parentDir); + } } diff --git a/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java b/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java index 89aa9f002b8f..388800ec065c 100644 --- a/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java +++ b/processing/src/main/java/io/druid/segment/loading/SegmentizerFactory.java @@ -20,10 +20,12 @@ package io.druid.segment.loading; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import io.druid.segment.QueryableIndex; import io.druid.segment.Segment; import io.druid.timeline.DataSegment; import java.io.File; +import java.io.IOException; /** * Factory that loads segment files from the disk and creates {@link Segment} object @@ -32,4 +34,6 @@ public interface SegmentizerFactory { public Segment factorize(DataSegment segment, File parentDir) throws SegmentLoadingException; + + public QueryableIndex loadIndex(File parentDir) throws IOException; } diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index fd7da345cf52..9783ed6589c9 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -174,7 +174,7 @@ public static final AggregationTestHelper createSelectQueryAggregationTestHelper new InjectableValues.Std().addValue( SelectQueryConfig.class, new SelectQueryConfig(true) - ) + ).addValue(IndexIO.class, TestHelper.getTestIndexIO()) ); Supplier configSupplier = Suppliers.ofInstance(new SelectQueryConfig(true)); diff --git a/processing/src/test/java/io/druid/segment/TestHelper.java b/processing/src/test/java/io/druid/segment/TestHelper.java index 16259313b660..bc520821eb20 100644 --- a/processing/src/test/java/io/druid/segment/TestHelper.java +++ b/processing/src/test/java/io/druid/segment/TestHelper.java @@ -43,9 +43,10 @@ public class TestHelper { private static final IndexMergerV9 INDEX_MERGER_V9; private static final IndexIO INDEX_IO; + private static final ObjectMapper jsonMapper; static { - final ObjectMapper jsonMapper = getJsonMapper(); + jsonMapper = new DefaultObjectMapper(); INDEX_IO = new IndexIO( jsonMapper, new ColumnConfig() @@ -57,6 +58,12 @@ public int columnCacheSizeBytes() } } ); + jsonMapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) + .addValue(ObjectMapper.class.getName(), jsonMapper) + .addValue(IndexIO.class, INDEX_IO) + ); INDEX_MERGER_V9 = new IndexMergerV9(jsonMapper, INDEX_IO); } @@ -72,24 +79,12 @@ public static IndexIO getTestIndexIO() public static ObjectMapper getJsonMapper() { - final ObjectMapper mapper = new DefaultObjectMapper(); - mapper.setInjectableValues( - new InjectableValues.Std() - .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) - .addValue(ObjectMapper.class.getName(), mapper) - ); - return mapper; + return jsonMapper; } public static ObjectMapper getSmileMapper() { - final ObjectMapper mapper = new DefaultObjectMapper(); - mapper.setInjectableValues( - new InjectableValues.Std() - .addValue(ExprMacroTable.class.getName(), TestExprMacroTable.INSTANCE) - .addValue(ObjectMapper.class.getName(), mapper) - ); - return mapper; + return jsonMapper; } public static Iterable revert(Iterable input) diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java index 19a97f7816a2..b96941c179ef 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderLocalCacheManager.java @@ -102,22 +102,13 @@ public StorageLocation findStorageLocationIfLoaded(final DataSegment segment) @Override public Segment getSegment(DataSegment segment) throws SegmentLoadingException { - File segmentFiles = getSegmentFiles(segment); - File factoryJson = new File(segmentFiles, "factory.json"); - final SegmentizerFactory factory; - - if (factoryJson.exists()) { - try { - factory = jsonMapper.readValue(factoryJson, SegmentizerFactory.class); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "%s", e.getMessage()); - } - } else { - factory = new MMappedQueryableSegmentizerFactory(indexIO); + try { + File segmentDir = getSegmentFiles(segment); + return indexIO.getSegmentizerFactory(segmentDir).factorize(segment, segmentDir); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "%s", e.getMessage()); } - - return factory.factorize(segment, segmentFiles); } @Override diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 2c7c97d6a9af..d393b2eda02c 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -19,6 +19,7 @@ package io.druid.segment.realtime.appenderator; +import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.metamx.common.logger.Logger; @@ -156,6 +157,11 @@ public int columnCacheSizeBytes() } } ); + objectMapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ObjectMapper.class.getName(), objectMapper) + .addValue(IndexIO.class, indexIO) + ); indexMerger = new IndexMergerV9(objectMapper, indexIO); emitter = new ServiceEmitter(