From d379f38c2efda2c2229764652b301d141991731c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Jan 2020 13:20:17 -0800 Subject: [PATCH 1/3] Support both IndexTuningConfig and ParallelIndexTuningConfig for compaction task --- .../indexing/common/task/CompactionTask.java | 57 +++- .../common/task/CompactionTaskTest.java | 258 ++++++++++++++++++ 2 files changed, 310 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index e85e57cd5354..679e6c53d85c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -51,6 +51,7 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask; @@ -74,6 +75,7 @@ import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ValueType; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -177,7 +179,7 @@ public CompactionTask( @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, - @JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig, + @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, @JsonProperty("context") @Nullable final Map context, @JacksonInject ObjectMapper jsonMapper, @JacksonInject AuthorizerMapper authorizerMapper, @@ -213,10 +215,10 @@ public CompactionTask( this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec; this.metricsSpec = metricsSpec; this.segmentGranularity = segmentGranularity; - this.tuningConfig = tuningConfig; + this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null; this.jsonMapper = jsonMapper; this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec()); - this.partitionConfigurationManager = new PartitionConfigurationManager(tuningConfig); + this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig); this.authorizerMapper = authorizerMapper; this.chatHandlerProvider = chatHandlerProvider; this.rowIngestionMetersFactory = rowIngestionMetersFactory; @@ -227,6 +229,51 @@ public CompactionTask( this.appenderatorsManager = appenderatorsManager; } + @VisibleForTesting + static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig) + { + if (tuningConfig instanceof ParallelIndexTuningConfig) { + return (ParallelIndexTuningConfig) tuningConfig; + } else if (tuningConfig instanceof IndexTuningConfig) { + final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig; + return new ParallelIndexTuningConfig( + null, + indexTuningConfig.getMaxRowsPerSegment(), + indexTuningConfig.getMaxRowsPerSegment(), + indexTuningConfig.getMaxBytesInMemory(), + indexTuningConfig.getMaxTotalRows(), + indexTuningConfig.getNumShards(), + null, + indexTuningConfig.getPartitionsSpec(), + indexTuningConfig.getIndexSpec(), + indexTuningConfig.getIndexSpecForIntermediatePersists(), + indexTuningConfig.getMaxPendingPersists(), + indexTuningConfig.isForceGuaranteedRollup(), + indexTuningConfig.isReportParseExceptions(), + indexTuningConfig.getPushTimeout(), + indexTuningConfig.getSegmentWriteOutMediumFactory(), + null, + null, + null, + null, + null, + null, + null, + null, + indexTuningConfig.isLogParseExceptions(), + indexTuningConfig.getMaxParseExceptions(), + indexTuningConfig.getMaxSavedParseExceptions() + ); + } else { + throw new ISE( + "Unknown tuningConfig type: [%s], Must be either [%s] or [%s]", + tuningConfig.getClass().getName(), + ParallelIndexTuningConfig.class.getName(), + IndexTuningConfig.class.getName() + ); + } + } + @JsonProperty public CompactionIOConfig getIoConfig() { @@ -848,7 +895,7 @@ public static class Builder @Nullable private Granularity segmentGranularity; @Nullable - private ParallelIndexTuningConfig tuningConfig; + private TuningConfig tuningConfig; @Nullable private Map context; @@ -911,7 +958,7 @@ public Builder segmentGranularity(Granularity segmentGranularity) return this; } - public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig) + public Builder tuningConfig(TuningConfig tuningConfig) { this.tuningConfig = tuningConfig; return this; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index c72f5fedfcf0..65c0f3da78ca 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -19,7 +19,11 @@ package org.apache.druid.indexing.common.task; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.exc.ValueInstantiationException; import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; @@ -44,6 +48,7 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; import org.apache.druid.indexing.common.RetryPolicyFactory; @@ -53,10 +58,12 @@ import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; +import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; import org.apache.druid.indexing.common.task.CompactionTask.Builder; import org.apache.druid.indexing.common.task.CompactionTask.PartitionConfigurationManager; import org.apache.druid.indexing.common.task.CompactionTask.SegmentProvider; +import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec; import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; @@ -97,6 +104,8 @@ import org.apache.druid.segment.data.RoaringBitmapSerdeFactory; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.RealtimeTuningConfig; +import org.apache.druid.segment.indexing.TuningConfig; import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; @@ -118,6 +127,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -423,6 +433,128 @@ public void testSerdeWithDimensions() throws IOException assertEquals(task, fromJson); } + @Test + public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws IOException + { + final OldCompactionTaskWithAnyTuningConfigType oldTask = new OldCompactionTaskWithAnyTuningConfigType( + null, + null, + DATA_SOURCE, + null, + SEGMENTS, + null, + null, + null, + null, + null, + new IndexTuningConfig( + null, + null, // null to compute maxRowsPerSegment automatically + 500000, + 1000000L, + null, + null, + null, + null, + null, + new IndexSpec( + new RoaringBitmapSerdeFactory(true), + CompressionStrategy.LZ4, + CompressionStrategy.LZF, + LongEncodingStrategy.LONGS + ), + null, + null, + true, + false, + 5000L, + null, + null, + null, + null, + null + ), + null, + OBJECT_MAPPER, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, + segmentLoaderFactory, + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER + ); + + final Builder builder = new Builder( + DATA_SOURCE, + OBJECT_MAPPER, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + ROW_INGESTION_METERS_FACTORY, + INDEXING_SERVICE_CLIENT, + COORDINATOR_CLIENT, + segmentLoaderFactory, + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER + ); + + final CompactionTask expectedFromJson = builder + .segments(SEGMENTS) + .tuningConfig(CompactionTask.getTuningConfig(oldTask.getTuningConfig())) + .build(); + + final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER); + mapper.registerSubtypes( + new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"), + new NamedType(IndexTuningConfig.class, "index") + ); + final byte[] bytes = mapper.writeValueAsBytes(oldTask); + final CompactionTask fromJson = mapper.readValue(bytes, CompactionTask.class); + assertEquals(expectedFromJson, fromJson); + } + + @Test + public void testSerdeWithUnknownTuningConfigThrowingError() throws IOException + { + final OldCompactionTaskWithAnyTuningConfigType taskWithUnknownTuningConfig = + new OldCompactionTaskWithAnyTuningConfigType( + null, + null, + DATA_SOURCE, + null, + SEGMENTS, + null, + null, + null, + null, + null, + RealtimeTuningConfig.makeDefaultTuningConfig(null), + null, + OBJECT_MAPPER, + AuthTestUtils.TEST_AUTHORIZER_MAPPER, + null, + ROW_INGESTION_METERS_FACTORY, + COORDINATOR_CLIENT, + segmentLoaderFactory, + RETRY_POLICY_FACTORY, + APPENDERATORS_MANAGER + ); + + final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER); + mapper.registerSubtypes( + new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"), + new NamedType(RealtimeTuningConfig.class, "realtime") + ); + final byte[] bytes = mapper.writeValueAsBytes(taskWithUnknownTuningConfig); + + expectedException.expect(ValueInstantiationException.class); + expectedException.expectCause(CoreMatchers.instanceOf(IllegalStateException.class)); + expectedException.expectMessage( + "Unknown tuningConfig type: [org.apache.druid.segment.indexing.RealtimeTuningConfig]" + ); + mapper.readValue(bytes, CompactionTask.class); + } + private static void assertEquals(CompactionTask expected, CompactionTask actual) { Assert.assertEquals(expected.getType(), actual.getType()); @@ -1348,4 +1480,130 @@ public SpatialIndex getSpatialIndex() return null; } } + + /** + * The compaction task spec in 0.16.0 except for the tuningConfig. + * The original spec accepts only {@link IndexTuningConfig}, but this class acceps any type of tuningConfig for + * testing. + */ + private static class OldCompactionTaskWithAnyTuningConfigType extends AbstractTask + { + private final Interval interval; + private final List segments; + @Nullable + private final DimensionsSpec dimensionsSpec; + @Nullable + private final AggregatorFactory[] metricsSpec; + @Nullable + private final Granularity segmentGranularity; + @Nullable + private final Long targetCompactionSizeBytes; + @Nullable + private final TuningConfig tuningConfig; + + @JsonCreator + public OldCompactionTaskWithAnyTuningConfigType( + @JsonProperty("id") final String id, + @JsonProperty("resource") final TaskResource taskResource, + @JsonProperty("dataSource") final String dataSource, + @JsonProperty("interval") @Nullable final Interval interval, + @JsonProperty("segments") @Nullable final List segments, + @JsonProperty("dimensions") @Nullable final DimensionsSpec dimensions, + @JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec, + @JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec, + @JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity, + @JsonProperty("targetCompactionSizeBytes") @Nullable final Long targetCompactionSizeBytes, + @JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig, + @JsonProperty("context") @Nullable final Map context, + @JacksonInject ObjectMapper jsonMapper, + @JacksonInject AuthorizerMapper authorizerMapper, + @JacksonInject ChatHandlerProvider chatHandlerProvider, + @JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory, + @JacksonInject CoordinatorClient coordinatorClient, + @JacksonInject SegmentLoaderFactory segmentLoaderFactory, + @JacksonInject RetryPolicyFactory retryPolicyFactory, + @JacksonInject AppenderatorsManager appenderatorsManager + ) + { + super(getOrMakeId(id, "compact", dataSource), null, taskResource, dataSource, context); + this.interval = interval; + this.segments = segments; + this.dimensionsSpec = dimensionsSpec; + this.metricsSpec = metricsSpec; + this.segmentGranularity = segmentGranularity; + this.targetCompactionSizeBytes = targetCompactionSizeBytes; + this.tuningConfig = tuningConfig; + } + + @Override + public String getType() + { + return "compact"; + } + + @JsonProperty + public Interval getInterval() + { + return interval; + } + + @JsonProperty + public List getSegments() + { + return segments; + } + + @JsonProperty + @Nullable + public DimensionsSpec getDimensionsSpec() + { + return dimensionsSpec; + } + + @JsonProperty + @Nullable + public AggregatorFactory[] getMetricsSpec() + { + return metricsSpec; + } + + @JsonProperty + @Nullable + public Granularity getSegmentGranularity() + { + return segmentGranularity; + } + + @Nullable + @JsonProperty + public Long getTargetCompactionSizeBytes() + { + return targetCompactionSizeBytes; + } + + @Nullable + @JsonProperty + public TuningConfig getTuningConfig() + { + return tuningConfig; + } + + @Override + public boolean isReady(TaskActionClient taskActionClient) + { + throw new UnsupportedOperationException(); + } + + @Override + public void stopGracefully(TaskConfig taskConfig) + { + throw new UnsupportedOperationException(); + } + + @Override + public TaskStatus run(TaskToolbox toolbox) + { + throw new UnsupportedOperationException(); + } + } } From 24209bb05222fb5ff083bcf2fb290e8c9b6fc21b Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Sat, 18 Jan 2020 15:08:25 -0800 Subject: [PATCH 2/3] tuningConfig module --- .../IndexingServiceTuningConfigModule.java | 51 +++++++++++++++++++ .../druid/indexing/common/task/IndexTask.java | 1 - .../parallel/ParallelIndexTuningConfig.java | 2 - .../common/task/CompactionTaskTest.java | 7 ++- .../java/org/apache/druid/cli/CliIndexer.java | 2 + .../apache/druid/cli/CliMiddleManager.java | 2 + .../org/apache/druid/cli/CliOverlord.java | 2 + .../java/org/apache/druid/cli/CliPeon.java | 2 + 8 files changed, 62 insertions(+), 7 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java diff --git a/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java new file mode 100644 index 000000000000..4799f2ad2805 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/guice/IndexingServiceTuningConfigModule.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.guice; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; +import org.apache.druid.initialization.DruidModule; + +import java.util.List; + +public class IndexingServiceTuningConfigModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule(IndexingServiceTuningConfigModule.class.getSimpleName()) + .registerSubtypes( + new NamedType(IndexTuningConfig.class, "index"), + new NamedType(ParallelIndexTuningConfig.class, "index_parallel") + ) + ); + } + + @Override + public void configure(Binder binder) + { + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index d0733d2f5b16..4d39df4c1cc6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1171,7 +1171,6 @@ public boolean isAppendToExisting() } } - @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index dbfc73d89b3f..feed00f17ff0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; @@ -35,7 +34,6 @@ import javax.annotation.Nullable; import java.util.Objects; -@JsonTypeName("index_parallel") public class ParallelIndexTuningConfig extends IndexTuningConfig { private static final int DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS = 1; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 65c0f3da78ca..1289944fa2d4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -48,6 +48,7 @@ import org.apache.druid.guice.GuiceAnnotationIntrospector; import org.apache.druid.guice.GuiceInjectableValues; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.RetryPolicyConfig; @@ -272,6 +273,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa objectMapper.registerModule( new SimpleModule().registerSubtypes(new NamedType(NumberedShardSpec.class, "NumberedShardSpec")) ); + objectMapper.registerModules(new IndexingServiceTuningConfigModule().getJacksonModules()); return objectMapper; } @@ -504,10 +506,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws .build(); final ObjectMapper mapper = new DefaultObjectMapper((DefaultObjectMapper) OBJECT_MAPPER); - mapper.registerSubtypes( - new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact"), - new NamedType(IndexTuningConfig.class, "index") - ); + mapper.registerSubtypes(new NamedType(OldCompactionTaskWithAnyTuningConfigType.class, "compact")); final byte[] bytes = mapper.writeValueAsBytes(oldTask); final CompactionTask fromJson = mapper.readValue(bytes, CompactionTask.class); assertEquals(expectedFromJson, fromJson); diff --git a/services/src/main/java/org/apache/druid/cli/CliIndexer.java b/services/src/main/java/org/apache/druid/cli/CliIndexer.java index 5ffd2a83483a..bf955a998643 100644 --- a/services/src/main/java/org/apache/druid/cli/CliIndexer.java +++ b/services/src/main/java/org/apache/druid/cli/CliIndexer.java @@ -37,6 +37,7 @@ import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -201,6 +202,7 @@ public DataNodeService getDataNodeService() new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), + new IndexingServiceTuningConfigModule(), new QueryablePeonModule(), new CliIndexerServerModule(properties), new LookupModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java index 0eeb5484919d..0d350eb89c54 100644 --- a/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java +++ b/services/src/main/java/org/apache/druid/cli/CliMiddleManager.java @@ -37,6 +37,7 @@ import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -182,6 +183,7 @@ public WorkerNodeService getWorkerNodeService(WorkerConfig workerConfig) new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), + new IndexingServiceTuningConfigModule(), new LookupSerdeModule() ); } diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index f84a5e18ee27..f63c68e951b6 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -43,6 +43,7 @@ import org.apache.druid.guice.IndexingServiceInputSourceModule; import org.apache.druid.guice.IndexingServiceModuleHelper; import org.apache.druid.guice.IndexingServiceTaskLogsModule; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.JacksonConfigProvider; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; @@ -344,6 +345,7 @@ private void configureOverlordHelpers(Binder binder) new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), new IndexingServiceTaskLogsModule(), + new IndexingServiceTuningConfigModule(), new SupervisorModule(), new LookupSerdeModule(), new SamplerModule() diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1e82c6ded5ee..3707b820cf0b 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -46,6 +46,7 @@ import org.apache.druid.guice.DruidProcessingModule; import org.apache.druid.guice.IndexingServiceFirehoseModule; import org.apache.druid.guice.IndexingServiceInputSourceModule; +import org.apache.druid.guice.IndexingServiceTuningConfigModule; import org.apache.druid.guice.Jerseys; import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; @@ -263,6 +264,7 @@ public SegmentListerResource getSegmentListerResource( new QueryablePeonModule(), new IndexingServiceFirehoseModule(), new IndexingServiceInputSourceModule(), + new IndexingServiceTuningConfigModule(), new ChatHandlerServerModule(properties), new LookupModule() ); From 40285ba93919165cd51ac65e2c53a072418742d4 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 21 Jan 2020 11:25:00 -0800 Subject: [PATCH 3/3] fix tests --- .../indexing/common/task/ClientCompactQuerySerdeTest.java | 3 +++ .../apache/druid/indexing/common/task/TaskSerdeTest.java | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java index a4ac7672b257..a67128ef6d01 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactQuerySerdeTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.indexing.ClientCompactQuery; @@ -36,6 +37,7 @@ import org.apache.druid.indexing.common.SegmentLoaderFactory; import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.segment.IndexSpec; @@ -168,6 +170,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa ) ); objectMapper.setInjectableValues(injectableValues); + objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel")); return objectMapper; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index c5841eaa27b4..b8ae69e2894f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.client.indexing.ClientKillQuery; @@ -38,6 +39,7 @@ import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig; import org.apache.druid.indexing.common.task.IndexTask.IndexIngestionSpec; import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig; +import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.aggregation.AggregatorFactory; @@ -78,6 +80,10 @@ public TaskSerdeTest() for (final Module jacksonModule : new FirehoseModule().getJacksonModules()) { jsonMapper.registerModule(jacksonModule); } + jsonMapper.registerSubtypes( + new NamedType(ParallelIndexTuningConfig.class, "index_parallel"), + new NamedType(IndexTuningConfig.class, "index") + ); } @Test