Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.plumber.SegmentHandoffNotifier;
Expand Down Expand Up @@ -2559,9 +2558,7 @@ public List<StorageLocationConfig> getLocations()
this::makeTimeseriesAndScanConglomerate,
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@
import org.apache.druid.segment.loading.LocalDataSegmentPusher;
import org.apache.druid.segment.loading.LocalDataSegmentPusherConfig;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
import org.apache.druid.segment.loading.StorageLocationConfig;
import org.apache.druid.segment.realtime.appenderator.AppenderatorImpl;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
Expand Down Expand Up @@ -2765,9 +2764,7 @@ public List<StorageLocationConfig> getLocations()
this::makeTimeseriesOnlyConglomerate,
Execs.directExecutor(), // queryExecutorService
EasyMock.createMock(MonitorScheduler.class),
new SegmentLoaderFactory(
new SegmentLoaderLocalCacheManager(null, segmentLoaderConfig, testUtils.getTestObjectMapper())
),
new SegmentLoaderFactory(null, testUtils.getTestObjectMapper()),
testUtils.getTestObjectMapper(),
testUtils.getTestIndexIO(),
MapCache.create(1024),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

package org.apache.druid.indexing.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.loading.SegmentLoader;
import org.apache.druid.segment.loading.SegmentLoaderConfig;
import org.apache.druid.segment.loading.SegmentLoaderLocalCacheManager;
Expand All @@ -29,23 +32,30 @@
import java.util.Collections;

/**
*
*/
public class SegmentLoaderFactory
{
private final SegmentLoaderLocalCacheManager loader;
private final IndexIO indexIO;
private final ObjectMapper jsonMapper;

@Inject
public SegmentLoaderFactory(
SegmentLoaderLocalCacheManager loader
IndexIO indexIO,
@Json ObjectMapper mapper
)
{
this.loader = loader;
this.indexIO = indexIO;
this.jsonMapper = mapper;
}

public SegmentLoader manufacturate(File storageDir)
{
return loader.withConfig(
new SegmentLoaderConfig().withLocations(Collections.singletonList(new StorageLocationConfig().setPath(storageDir)))
return new SegmentLoaderLocalCacheManager(
indexIO,
new SegmentLoaderConfig().withLocations(
Collections.singletonList(new StorageLocationConfig().setPath(storageDir))),
jsonMapper
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.Lists;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionSchema.MultiValueHandling;
import org.apache.druid.data.input.impl.DimensionsSpec;
Expand All @@ -40,6 +41,8 @@
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimeAndDimsParseSpec;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.RetryPolicyFactory;
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -132,6 +135,15 @@ public class CompactionTask extends AbstractTask
@JsonIgnore
private final RowIngestionMetersFactory rowIngestionMetersFactory;

@JsonIgnore
private final CoordinatorClient coordinatorClient;

@JsonIgnore
private final SegmentLoaderFactory segmentLoaderFactory;

@JsonIgnore
private final RetryPolicyFactory retryPolicyFactory;

@JsonIgnore
private List<IndexTask> indexTaskSpecs;

Expand All @@ -153,7 +165,10 @@ public CompactionTask(
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
@JacksonInject ChatHandlerProvider chatHandlerProvider,
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory
@JacksonInject RowIngestionMetersFactory rowIngestionMetersFactory,
@JacksonInject CoordinatorClient coordinatorClient,
@JacksonInject SegmentLoaderFactory segmentLoaderFactory,
@JacksonInject RetryPolicyFactory retryPolicyFactory
)
{
super(getOrMakeId(id, TYPE, dataSource), null, taskResource, dataSource, context);
Expand Down Expand Up @@ -186,6 +201,9 @@ public CompactionTask(
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.coordinatorClient = coordinatorClient;
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
}

@JsonProperty
Expand Down Expand Up @@ -278,20 +296,23 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
metricsSpec,
keepSegmentGranularity,
segmentGranularity,
jsonMapper
jsonMapper,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
).stream()
.map(spec -> new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
spec,
getContext(),
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory
))
.collect(Collectors.toList());
.map(spec -> new IndexTask(
getId(),
getGroupId(),
getTaskResource(),
getDataSource(),
spec,
getContext(),
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory
))
.collect(Collectors.toList());
}

if (indexTaskSpecs.isEmpty()) {
Expand Down Expand Up @@ -338,7 +359,10 @@ static List<IndexIngestionSpec> createIngestionSchema(
@Nullable final AggregatorFactory[] metricsSpec,
@Nullable final Boolean keepSegmentGranularity,
@Nullable final Granularity segmentGranularity,
final ObjectMapper jsonMapper
final ObjectMapper jsonMapper,
final CoordinatorClient coordinatorClient,
final SegmentLoaderFactory segmentLoaderFactory,
final RetryPolicyFactory retryPolicyFactory
) throws IOException, SegmentLoadingException
{
Pair<Map<DataSegment, File>, List<TimelineObjectHolder<String, DataSegment>>> pair = prepareSegments(
Expand Down Expand Up @@ -379,7 +403,14 @@ static List<IndexIngestionSpec> createIngestionSchema(
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
createIoConfig(
toolbox,
dataSchema,
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
compactionTuningConfig
)
);
Expand Down Expand Up @@ -411,7 +442,14 @@ static List<IndexIngestionSpec> createIngestionSchema(
specs.add(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, interval),
createIoConfig(
toolbox,
dataSchema,
interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
compactionTuningConfig
)
);
Expand All @@ -438,15 +476,29 @@ static List<IndexIngestionSpec> createIngestionSchema(
return Collections.singletonList(
new IndexIngestionSpec(
dataSchema,
createIoConfig(toolbox, dataSchema, segmentProvider.interval),
createIoConfig(
toolbox,
dataSchema,
segmentProvider.interval,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
compactionTuningConfig
)
);
}
}
}

private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema dataSchema, Interval interval)
private static IndexIOConfig createIoConfig(
TaskToolbox toolbox,
DataSchema dataSchema,
Interval interval,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory
)
{
return new IndexIOConfig(
new IngestSegmentFirehoseFactory(
Expand All @@ -456,7 +508,10 @@ private static IndexIOConfig createIoConfig(TaskToolbox toolbox, DataSchema data
// set dimensions and metrics names to make sure that the generated dataSchema is used for the firehose
dataSchema.getParser().getParseSpec().getDimensionsSpec().getDimensionNames(),
Arrays.stream(dataSchema.getAggregators()).map(AggregatorFactory::getName).collect(Collectors.toList()),
toolbox.getIndexIO()
toolbox.getIndexIO(),
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
),
false
);
Expand Down Expand Up @@ -811,7 +866,7 @@ IndexTuningConfig computeTuningConfig(List<Pair<QueryableIndex, DataSegment>> qu
* targetCompactionSizeBytes cannot be used with {@link IndexTuningConfig#maxRowsPerSegment},
* {@link IndexTuningConfig#maxTotalRows}, or {@link IndexTuningConfig#numShards} together.
* {@link #hasPartitionConfig} checks one of those configs is set.
*
* <p>
* This throws an {@link IllegalArgumentException} if targetCompactionSizeBytes is set and hasPartitionConfig
* returns true. If targetCompactionSizeBytes is not set, this returns null or
* {@link DataSourceCompactionConfig#DEFAULT_TARGET_COMPACTION_SIZE_BYTES} according to the result of
Expand Down Expand Up @@ -860,6 +915,9 @@ public static class Builder
private final AuthorizerMapper authorizerMapper;
private final ChatHandlerProvider chatHandlerProvider;
private final RowIngestionMetersFactory rowIngestionMetersFactory;
private final CoordinatorClient coordinatorClient;
private final SegmentLoaderFactory segmentLoaderFactory;
private final RetryPolicyFactory retryPolicyFactory;

@Nullable
private Interval interval;
Expand All @@ -885,14 +943,20 @@ public Builder(
ObjectMapper jsonMapper,
AuthorizerMapper authorizerMapper,
ChatHandlerProvider chatHandlerProvider,
RowIngestionMetersFactory rowIngestionMetersFactory
RowIngestionMetersFactory rowIngestionMetersFactory,
CoordinatorClient coordinatorClient,
SegmentLoaderFactory segmentLoaderFactory,
RetryPolicyFactory retryPolicyFactory
)
{
this.dataSource = dataSource;
this.jsonMapper = jsonMapper;
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
this.coordinatorClient = coordinatorClient;
this.segmentLoaderFactory = segmentLoaderFactory;
this.retryPolicyFactory = retryPolicyFactory;
}

public Builder interval(Interval interval)
Expand Down Expand Up @@ -968,7 +1032,10 @@ public CompactionTask build()
jsonMapper,
authorizerMapper,
chatHandlerProvider,
rowIngestionMetersFactory
rowIngestionMetersFactory,
coordinatorClient,
segmentLoaderFactory,
retryPolicyFactory
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.apache.druid.indexing.common.stats.RowIngestionMeters;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.TaskRealtimeMetricsMonitor;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.JodaUtils;
Expand Down Expand Up @@ -84,7 +83,6 @@
import org.apache.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import org.apache.druid.segment.realtime.firehose.ChatHandler;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizerMapper;
Expand Down Expand Up @@ -419,8 +417,6 @@ public TaskStatus run(final TaskToolbox toolbox)

final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();

setFirehoseFactoryToolbox(firehoseFactory, toolbox);

final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
FileUtils.forceMkdir(firehoseTempDir);
Expand Down Expand Up @@ -489,25 +485,6 @@ public TaskStatus run(final TaskToolbox toolbox)
}
}

// pass toolbox to any IngestSegmentFirehoseFactory
private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
{
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
return;
}

if (firehoseFactory instanceof CombiningFirehoseFactory) {
for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
} else if (delegateFactory instanceof CombiningFirehoseFactory) {
setFirehoseFactoryToolbox(delegateFactory, toolbox);
}
}
}
}

private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.firehose.IngestSegmentFirehoseFactory;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -185,11 +184,6 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();

if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
// pass toolbox to Firehose
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
}

final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
FileUtils.forceMkdir(firehoseTempDir);
Expand Down
Loading