Skip to content

Support replaceExisting parameter for segments pushers#5187

Merged
gianm merged 3 commits intoapache:masterfrom
dclim:pusher-overwrite
Jan 4, 2018
Merged

Support replaceExisting parameter for segments pushers#5187
gianm merged 3 commits intoapache:masterfrom
dclim:pusher-overwrite

Conversation

@dclim
Copy link
Copy Markdown
Contributor

@dclim dclim commented Dec 20, 2017

AppenderatorImpl (used by Kafka indexing, local indexing) uses replaceExisting=true, RealtimePlumber uses replaceExisting=false. See #5161 for details on the issue.

Fixes #5161.

This PR breaks compatibility with implementations of DataSegmentPusher. The signature for:

DataSegment push(File file, DataSegment segment) throws IOException

has changed to:

DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;

* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is incompatible, please either mark the PR as incompatible or make it compatible (probably by making this new push a default method that just calls the original push). imo, marking as incompatible is the way to go, especially given that 0.12.0 is the next version.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I was debating between making it backwards-compatible or not and felt that given the issues that can arise if overwrite behavior is not explicitly defined, thought it would be better to force implementers to acknowledge the new flag.

What needs to be added (and where) to document it as being incompatible?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adding the label "Incompatible" and updating the main PR description is enough, it will then get noticed by whoever writes the release notes.

s3Client.getObjectDetails(bucketName, objectKey);
return true;
}
catch (ServiceException e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be S3Utils.isObjectInBucket(s3Client, bucketName, objectKey), which also does more fine grained error checking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, that's what I was looking for, thanks!

.withBinaryVersion(SegmentUtils.getVersionFromDir(fileToUpload));

dataSegmentPusher.push(fileToUpload, segmentToUpload);
dataSegmentPusher.push(fileToUpload, segmentToUpload, false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment about why it is false.

// version, but is "newer" than said original version.
DataSegment updatedSegment = segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), outVersion));
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment);
updatedSegment = toolbox.getSegmentPusher().push(outLocation, updatedSegment, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment about why it is true.


// Upload file
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment);
final DataSegment uploadedSegment = toolbox.getSegmentPusher().push(fileToUpload, mergedSegment, true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment about why it is true.

mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment about why it is true.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it would be nicer if option came from the caller as eventually it should get used by others too and not just local indexing and kafka indexing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, although I think it's not necessary for this PR as it may involve more substantial refactoring, and here we just want to fix the bug in #5161. But someone might want to do it one day.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that sound alright, it would be good however to leave a comment in the javadocs of this class to remind devs of this special handling and caution that it needs to be modified if AppenderatorImpl is used in other cases in future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, will add a comment to the javadoc

mergedFile,
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes))
sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)),
false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserves a comment about why it is false.

return jsonMapper.readValue(new File(outDir, "descriptor.json"), DataSegment.class);
if (replaceExisting) {
try {
FileUtils.cleanDirectory(outDir);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a race where it potentially deletes data, if the process crashes in the middle of FileUtils.cleanDirectory. It would be bad in the case where the first kafka replica succeeds, and the second one crashes here and deletes its data. So it'd be better to overwrite the files one-by-one, by first writing them to disk as tmp files, and then moving those tmp files on top of the previously existing files. Ideally the files should be force()ed before being moved, since then we confirm they are fsynced to disk, and aren't left with another possible source of corruption after a crash. I would want to fsync the directory too but I don't know how to do that in Java…

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, thank you

@gianm gianm added the Bug label Dec 20, 2017

// fsync needs to happen before the FileOutputStream is closed by the try-with-resources
if (out instanceof FileOutputStream) {
((FileOutputStream) out).getChannel().force(true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is weird :)
zipOut.flush() is called before which calls out.flush() which is no-op for FileOutputStream (and rightly so based on the explanation in https://stackoverflow.com/questions/31456660/fileoutputstream-does-the-close-method-calls-also-flush )
have you seen this failing in some scenario/test or are we being extra careful ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I haven't seen any failures here, it was just based on Gian's suggestion to fsync before moving the file to minimize the potential for issues. Yes agreed the construct isn't great :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm curious, have you experienced any problem that happens without this change ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug it varies by filesystem; some are susceptible to a problem where write-then-rename-then-crash will get you mysterious zero-length files, where you once had perfectly good normal files. One example is early versions of ext4. Another is (I think) still current versions of xfs, see this thread: https://marc.info/?l=linux-xfs&m=139864441911660&w=2

See https://www.kernel.org/doc/Documentation/filesystems/ext4.txt for some commentary in the ext4 docs. You can see how the superior kernel programmers feel about us poor, dumb application programmers :)

auto_da_alloc(*)	Many broken applications don't use fsync() when 
noauto_da_alloc		replacing existing files via patterns such as
			fd = open("foo.new")/write(fd,..)/close(fd)/
			rename("foo.new", "foo"), or worse yet,
			fd = open("foo", O_TRUNC)/write(fd,..)/close(fd).
			If auto_da_alloc is enabled, ext4 will detect
			the replace-via-rename and replace-via-truncate
			patterns and force that any delayed allocation
			blocks are allocated such that at the next
			journal commit, in the default data=ordered
			mode, the data blocks of the new file are forced
			to disk before the rename() operation is
			committed.  This provides roughly the same level
			of guarantees as ext3, and avoids the
			"zero-length" problem that can happen when a
			system crashes before the delayed allocation
			blocks are forced to disk.

In this case, where the local deep storage pusher is probably writing to a network mounted filesystem, it's not really relevant what local fses like ext4 and xfs do, but I think it's still good to be safe given that people writing filesystem drivers have opinions like the above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm thanks for the research. ext4 doc gives some hope that it is automatically trying to take care of not doing fsync() scenario. but for NFS, who knows.
it would be good to leave a comment in the code to this thread.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple minor suggestions. I think neither are blocking but should be pretty straightforward.


FileUtils.forceMkdir(outDir);
if (replaceExisting) {
Files.move(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use renameTo on the actual file objects. Files.move is a convenience method that falls back to copy-then-delete when rename fails (possibly because the files are on different filesystems). But we really don't want that. It would defeat the purpose of using a temp file and then renaming it (atomicity).

In this case, it shouldn't matter, since we "know" that the files are on the same filesystem anyway. But I think it's best to encode that knowledge in our choice of function used to rename this file.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm I'm not sure how to use File.renameTo(File) to accomplish this since that method doesn't support overwriting existing files.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dclim renameTo says it might not succeed if the destination already exists, but you can trust that it will, at least on a POSIX system, which are the only kind that we support anyway.

// Workaround for http://hg.openjdk.java.net/jdk8/jdk8/jdk/rev/759aa847dcaf
zipOut.flush();

// fsync needs to happen before the FileOutputStream is closed by the try-with-resources
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to get called in a few places where it isn't necessary (like the S3 pusher). It'd be better to only fsync when necessary. Add an option?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably move this logic to

 public static long zip(File directory, File outputZipFile) throws IOException

method in the same class which passes FileOutputStream . Also add a boolean fsync flag to above method which would be true only for local pusher.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@himanshug ah, I'm not sure if I was clear why I put it in this method instead of public static long zip(File directory, File outputZipFile) throws IOException and used the ugly instanceof - you can't sync on a FileOutputStream after it has been closed and the try-with-resources for the ZipOutputStream here closes the underlying FileOutputStream before the method returns, so trying to do it in public static long zip(File directory, File outputZipFile) throws IOException will result in a 'bad file descriptor' exception and will fail.

The two options I can think of are to do what I did and fsync before the streams are closed, or alternatively, I can remove the try-with-resources for ZipOutputStream and not call close on that stream. This might actually not be as bad an option as I first thought since ZipOutputStream.close() doesn't do much other than close the underlying stream and prevent new files from being written to the stream. In this case, it would be the caller's responsibility to close the stream they passed in which I think is perfectly reasonable (and which callers are already doing). What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I agree with the reasoning of not using try-with-resources and let callers be responsible for closing their streams.

@gianm gianm added this to the 0.12.0 milestone Jan 3, 2018
@gianm gianm merged commit a7967ad into apache:master Jan 4, 2018
gianm pushed a commit to gianm/druid that referenced this pull request Jan 4, 2018
* support replaceExisting parameter for segments pushers

* code review changes

* code review changes
jihoonson pushed a commit that referenced this pull request Jan 4, 2018
* support replaceExisting parameter for segments pushers

* code review changes

* code review changes
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants