Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.initialization.DruidModule;

import java.util.List;

public class IndexingServiceTuningConfigModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(IndexingServiceTuningConfigModule.class.getSimpleName())
.registerSubtypes(
new NamedType(IndexTuningConfig.class, "index"),
new NamedType(ParallelIndexTuningConfig.class, "index_parallel")
)
);
}

@Override
public void configure(Binder binder)
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
Expand All @@ -74,6 +75,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
Expand Down Expand Up @@ -177,7 +179,7 @@ public CompactionTask(
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
Expand Down Expand Up @@ -213,10 +215,10 @@ public CompactionTask(
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.tuningConfig = tuningConfig;
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.jsonMapper = jsonMapper;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(tuningConfig);
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
Expand All @@ -227,6 +229,51 @@ public CompactionTask(
this.appenderatorsManager = appenderatorsManager;
}

@VisibleForTesting
static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig)
{
if (tuningConfig instanceof ParallelIndexTuningConfig) {
return (ParallelIndexTuningConfig) tuningConfig;
} else if (tuningConfig instanceof IndexTuningConfig) {
final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig;
return new ParallelIndexTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.getMaxTotalRows(),
indexTuningConfig.getNumShards(),
null,
indexTuningConfig.getPartitionsSpec(),
indexTuningConfig.getIndexSpec(),
indexTuningConfig.getIndexSpecForIntermediatePersists(),
indexTuningConfig.getMaxPendingPersists(),
indexTuningConfig.isForceGuaranteedRollup(),
indexTuningConfig.isReportParseExceptions(),
indexTuningConfig.getPushTimeout(),
indexTuningConfig.getSegmentWriteOutMediumFactory(),
null,
null,
null,
null,
null,
null,
null,
null,
indexTuningConfig.isLogParseExceptions(),
indexTuningConfig.getMaxParseExceptions(),
indexTuningConfig.getMaxSavedParseExceptions()
);
} else {
throw new ISE(
"Unknown tuningConfig type: [%s], Must be either [%s] or [%s]",
tuningConfig.getClass().getName(),
ParallelIndexTuningConfig.class.getName(),
IndexTuningConfig.class.getName()
);
}
}

@JsonProperty
public CompactionIOConfig getIoConfig()
{
Expand Down Expand Up @@ -848,7 +895,7 @@ public static class Builder
@Nullable
private Granularity segmentGranularity;
@Nullable
private ParallelIndexTuningConfig tuningConfig;
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;

Expand Down Expand Up @@ -911,7 +958,7 @@ public Builder segmentGranularity(Granularity segmentGranularity)
return this;
}

public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig)
public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,6 @@ public boolean isAppendToExisting()
}
}

@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand All @@ -35,7 +34,6 @@
import javax.annotation.Nullable;
import java.util.Objects;

@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
private static final int DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactQuery;
Expand All @@ -36,6 +37,7 @@
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexSpec;
Expand Down Expand Up @@ -168,6 +170,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa
)
);
objectMapper.setInjectableValues(injectableValues);
objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel"));
return objectMapper;
}
}
Loading