Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,15 @@ File persist(
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;

File mergeSegmentFiles(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add javadoc with the rationale for this method, and the difference from the most closely related one, mergeQueryableIndex().

File[] segmentFiles,
boolean rollup,
AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException;

File mergeQueryableIndex(
List<QueryableIndex> indexes,
boolean rollup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,40 @@ public File persist(
);
}

@Override
public File mergeSegmentFiles(
File[] segmentFiles,
boolean rollup,
AggregatorFactory[] metricAggs,
File outDir,
IndexSpec indexSpec,
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
) throws IOException
{
List<QueryableIndex> indexes = Lists.transform(Arrays.asList(segmentFiles), input -> {
try {
return indexIO.loadIndex(input);
}
catch (IOException e) {
throw new ISE(e, "load index %s failed!?", input.getAbsolutePath());
}
});

try {
return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory);
}
finally {
for (QueryableIndex index : indexes) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Closer to close many objects.

try {
index.close();
}
catch (Exception e) {
log.warn(e, "failed to close index when merge");
}
}
}
}

@Override
public File mergeQueryableIndex(
List<QueryableIndex> indexes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import javax.annotation.Nullable;
import java.io.Closeable;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
Expand Down Expand Up @@ -716,8 +717,20 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink
closer.register(segmentAndCloseable.rhs);
}

mergedFile = indexMerger.mergeQueryableIndex(
indexes,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaijianding I agree with @clintropolis, maybe this PR is incomplete, and you planned to free those indexes? In it's current form the PR doesn't free any extra resources that were (probably unnecessarily) held before by the worker process.

Copy link
Copy Markdown
Contributor Author

@kaijianding kaijianding Dec 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The QueryableIndexes are always new objects loaded from file, and closed when IndexMerger.mergeSegmentFiles() is done. This behavior frees the mmap usage increased during the merge process.
Currently we use indexMerger.mergeQueryableIndex(indexes) in RealtimePlumber, the indexes are not closed to explicitly release the increased mmap because these indexes are still used by queries. Then the mmap usage always increases until abandonSegment() is called which only happens when handoff succeeds.
If handoff is slow or coordinator is not working properly, the mmap usage will keep increasing, this is a problem.

This PR separates the QueryableIndex used by query and used by merge process, then we can close the QueryableIndexes used by merge process to release mmap, and leave the QueryableIndexes used by query untouched.

This PR is verified in my product environment, it indeed controls the mmap usage

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is beyond me how just creating (and later closing) some new objects without revoking creating any other objects could improve anything, unless any code around here is lazy, that doesn't seem to be the case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is about avoiding refreshing some memory mapped files in memory (although I don't see the mechanism how it helps either), at very least the surrounding try {} block should be refactored, because currently it couples the creation of the indexes list and mergeSegmentFiles() for no apparent reason.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will address these comments.

The mechanism is like this:

  1. mmap usage will increase after indexes merged. mmap can only be released when close() called on indexes.
  2. currently close() on the indexes are not called until abandonSegment() which can be delayed for a very long time if handoff is slow. Before handoff succeeds, the mmap will keep increasing.
  3. to avoid this increasement, I create new QueryableIndexes object and close them after each merge success/fail, then the increased mmap during the merge process is released.
  4. I think this magic works because even a flle is mmaped multiple times, the mmap usage is calculated separately each mmap action, and can be un-mmaped separately each un-mmap action. In this case, close() on QueryableIndex is the un-mmap action.

Wish I explain the mechanism clearly. Though this PR is very simple, it indeed help on the mmap usage.

There is an alternative solution to recycle mmap: load the mergedFile as QueryableIndex and swap all hydrants' small QueryableIndexes with this single big QueryableIndex to the Sink. and close all hydrants' small QueryableIndexes and delete all hydrant segments to release mmap (the mmap usage is already increased after merge process done)

Copy link
Copy Markdown
Contributor Author

@kaijianding kaijianding Dec 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are standing at the same page now, your understanding is totally correct.

The worst case, the complete set of data is queried (all rows/all columns), the exact same footprint is as the merge required in the first place.
But usually(as I noticed in my production environment), very little mmap is increased after queries, many columns are ingested but not queried at all. This part of mmap is not a problem. And if we do the final swap after merge, this part of mmap will decrease because of the new merged segment has no mmap footprint. In most cases, user is like to query the latest data, when handoff is slow, it's better to do the final swap to reduce this part of mmap for the earlier time's hydrant segments.

Back to this PR, it releases mmap after merge is done, as a result, the mmap usage is under control, it won't grow to a huge number when handoff is slow to let the process be killed by yarn.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, glad we could sort out what is going on, and apologies it took so long for me to understand what the point was 👍

Since this doesn't really seem to particularly have any negative consequences, and since it is doing something useful in some cases at least, I'll have another look at this PR.

Also in jdk 10+, I believe it will be possible to open files with O_DIRECT, which is probably what we really want here if we want merge to be done out of the query path and not have a significant impact on page cache usage, though it would potentially lower merge performance. I think it would be worth putting a note and adding this link so our future selves remember to consider if we ever make it out of java 8.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis it's possible to use O_DIRECT already, obtain a raw pointer from mmap() and wrap as https://github.com/DataSketches/memory.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so my understanding is that, this is useful in case when, on realtime process, very small subset of columns/data in the intermediate persisted segments is being read from queries.
some commentary in the code along those lines would be great.
overall, this patch looks useful to me.

"swap after merge" is further useful to reduce inode count, but the swap would be tricky to implement as there might be queries underway on the indexes just merged.

I think O_DIRECT is whole other beast and would require significant performance regression testing in this case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this got marked stale, but i also think it would be ok to merge if @leventov's comments and some additional javadocs explaining what was going on and linking to this discussion were added. Do you have any interest in finishing this @kaijianding?

Apologies that it got stalled in review for so long.

File[] hydrantDirs = persistDir.listFiles(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listFiles() is an unsafe API, it may return null. Use Files.list() instead. Due to this, it's also more convenient to change the new IndexMerger's new method parameter type to List<Path>.

new FilenameFilter()
{
@Override
public boolean accept(File dir, String fileName)
{
// To avoid reading and listing of "merged" dir
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is unclear, please elaborate

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"hydrantFiles" maybe? or those files are actually directories? Then IndexMerger.mergeSegmentFiles() should be called mergeSegmentDirs().

return !(Ints.tryParse(fileName) == null);
}
}
);

mergedFile = indexMerger.mergeSegmentFiles(
hydrantDirs,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,20 @@ public void doRun()
closer.register(segmentAndCloseable.rhs);
}

mergedFile = indexMerger.mergeQueryableIndex(
indexes,
File[] hydrantDirs = persistDir.listFiles(
new FilenameFilter()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting the repeated fragment as a method

{
@Override
public boolean accept(File dir, String fileName)
{
// To avoid reading and listing of "merged" dir
return !(Ints.tryParse(fileName) == null);
}
}
);

mergedFile = indexMerger.mergeSegmentFiles(
hydrantDirs,
schema.getGranularitySpec().isRollup(),
schema.getAggregators(),
mergedTarget,
Expand Down