Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

/**
*
Expand Down Expand Up @@ -128,6 +129,7 @@ public void close() throws IOException
serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath);
serverProperties.put("zookeeper.session.timeout.ms", "10000");
serverProperties.put("zookeeper.sync.time.ms", "200");
serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000));

kafkaConfig = new KafkaConfig(serverProperties);

Expand Down
43 changes: 26 additions & 17 deletions processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,24 +214,8 @@ public File mergeQueryableIndex(
ProgressIndicator progress
) throws IOException
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
List<IndexableAdapter> indexAdapteres = Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
return merge(
indexAdapteres,
toIndexableAdapters(indexes),
rollup,
metricAggs,
outDir,
Expand Down Expand Up @@ -268,6 +252,26 @@ public Iterable<String> apply(@Nullable IndexableAdapter input)
);
}

private static List<IndexableAdapter> toIndexableAdapters(List<QueryableIndex> indexes)
{
// We are materializing the list for performance reasons. Lists.transform
// only creates a "view" of the original list, meaning the function gets
// applied every time you access an element.
return Lists.newArrayList(
Iterables.transform(
indexes,
new Function<QueryableIndex, IndexableAdapter>()
{
@Override
public IndexableAdapter apply(final QueryableIndex input)
{
return new QueryableIndexIndexableAdapter(input);
}
}
)
);
}

private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> indexes)
{
int maxSize = 0;
Expand Down Expand Up @@ -303,6 +307,11 @@ private static List<String> getLongestSharedDimOrder(List<IndexableAdapter> inde
return ImmutableList.copyOf(orderingCandidate);
}

public static List<String> getMergedDimensionsFromQueryableIndexes(List<QueryableIndex> indexes)
{
return getMergedDimensions(toIndexableAdapters(indexes));
}

public static List<String> getMergedDimensions(List<IndexableAdapter> indexes)
{
if (indexes.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,11 +570,9 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
tuningConfig.getIndexSpec()
);

QueryableIndex index = indexIO.loadIndex(mergedFile);

DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);

objectMapper.writeValue(descriptorFile, segment);
Expand Down Expand Up @@ -920,6 +918,14 @@ public Object apply(@Nullable Object input)
if (cache != null) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
}
try {
hydrant.getSegment().close();
}
catch (IOException e) {
log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource())
.addData("identifier", hydrant.getSegment().getIdentifier())
.emit();
}
}

if (removeOnDiskData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import io.druid.concurrent.TaskThreadPriority;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactoryConglomerate;
Expand Down Expand Up @@ -424,12 +424,11 @@ public void doRun()
metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime);
metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS));

QueryableIndex index = indexIO.loadIndex(mergedFile);
log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier());

DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
);
log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier());
segmentPublisher.publishSegment(segment);
Expand Down Expand Up @@ -861,6 +860,7 @@ protected void abandonSegment(final long truncatedTime, final Sink sink)
);
for (FireHydrant hydrant : sink) {
cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant));
hydrant.getSegment().close();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite it's documentation, abandonSegment() is called not only from mergeExecutor, so races between persistAndMerge() and abandonSegment() are possible. It should be resolved before merging this change, because it may lead to JVM crashes.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaijianding could you address this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain more on the race condition? @leventov

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

abandonSegment() is called in FlushingPlumber not from mergeExecutor. Well, probably it doesn't lead to race, because in the context of FlushingPlumber mergeExecutor is not used at all, but it's a dangerous situation. Could you please refactor RealtimePlumber/FlushingPlumber by extracting logic and fields which are used by both into a superclass, and make RealtimePlumber and FlushingPlumber both subclasses of that class. So that FlushingPlumber doesn't have unused Executor fields.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point. You'd like to move the merge and handoff part of code out of the parent class and make them only in RealtimePlumber who really needs to care about merge and handoff.
I tried to refactor as you described, I found there are huge diff to make and I think it need be carefully tested. I think it's big risk to do it in this PR.
As FlushingPlumber doesn't really start the mergeExecuter, thus there is no race condition here. Should we do the refactor in another PR? @leventov

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, another PR is ok. Or maybe even skipping it altogether, and instead, migrating users of Plumbers to Appenderator (which is meant to be an improved replacement).

The flushing plumber isn't really meant to be used in production anyway (I think it's not even documented). It was meant to be a way to set up some realtime demos that just throw away data after a period of time.

}
synchronized (handoffCondition) {
handoffCondition.notifyAll();
Expand Down