Use unique segment paths for Kafka indexing#5692
Use unique segment paths for Kafka indexing#5692gianm merged 6 commits intoapache:masterfrom dclim:unique-segments
Conversation
There was a problem hiding this comment.
What do you think about receiving a suffix as a parameter and making the caller to be responsible for passing a unique suffix (like number of task attempts)? It would be safer than a short random UUID.
There was a problem hiding this comment.
Hm, I can go either way on that. I actually think it would be less safe to have the suffix passed as a parameter, since a caller may not in general understand all the possible failure modes and may provide a non-random suffix that doesn't actually help the issue, and also may not understand what are valid filesystem characters for the suffix (but we can validate for this). The advantage would be that all segments generated by a particular task could be tagged with the same prefix for tracking, if that's something interesting to us. A UUID chopped to 5 characters still has over 1 million possibilities so I'm not too worried about collisions.
There was a problem hiding this comment.
uploadBlob() still has the replaceExisting parameter and this is always true. Is this intentional?
There was a problem hiding this comment.
It was intentional yes, but now I'm thinking it might be better to just remove it. Honestly, I don't even know for each deep storage implementation how effective 'don't overwrite' semantics that depend on 'if object exists' are, since they are generally eventually consistent (which is why I retained overwrite behavior for most of them). I'll remove.
There was a problem hiding this comment.
Would you add a comment about useUniquePath is used in push() instead of here?
There was a problem hiding this comment.
Also, please add a check that useUniquePath is always false.
There was a problem hiding this comment.
Please add a check that useUniquePath is always false.
There was a problem hiding this comment.
I opted to remove useUniquePath from here and left a comment that this is only used by Hadoop indexing and never uses unique paths.
gianm
left a comment
There was a problem hiding this comment.
In addition to the individual line comments, DataSegmentFinders should be updated too. Clearly, we cannot know which segment to insert if there are multiple options. I think in that case we should log a warning and take the newest one.
The "insert-segment-to-db.md" doc needs to make it very clear that it is not going to necessarily be a perfect import, and is a tool only to be used in a last resort.
There was a problem hiding this comment.
Could you add some javadocs for kill and killQuietly? It's semi-obvious what the difference is, but IMO not obvious enough to avoid docs.
There was a problem hiding this comment.
But do this for 100,000 segments and all of a sudden the odds are high that there will be a collision for some segment. The birthday problem strikes again!
Embrace the full UUID.
There was a problem hiding this comment.
Well, for there to be a collision, it has to happen as part of one of the failure scenarios, meaning that old shards from jobs run a long time ago aren't going to be involved in any collisions, even if there are millions of them. The scenarios would be things like task replicas both pushing to S3 at the same time, or a Kafka task failing after pushing or publishing and the replacement task pushing the same segment IDs, and in those cases, you'd have to have configured something really wrong to be pushing 100k segments from an indexing task.
But if you think it's still safer, I'm okay with using the full UUID; just trying to do my part to keep people from hitting their PATH_MAX or whatever on their file system.
There was a problem hiding this comment.
I'm thinking of the situation where you have two replicas and they will always push to S3 around the same time (which seems likely). Even in a non-failure scenario there will still be two pushes going on at once. In that case it's likely that some pair of replicas will collide with itself.
There was a problem hiding this comment.
I'm thinking of the situation where you have two replicas and they will always push to S3 around the same time (which seems likely).
Ah, I missed this case. If so, what do you think about receiving a suffix as a parameter and making the caller to be responsible for passing a unique suffix as commented above?
There was a problem hiding this comment.
I'd generally prefer random instead of something like an attempt id. It's because with attempt ids, you have to worry about not assigning two tasks the same attempt id by accident, and we don't have a concept like this right now. With big random numbers you know for sure that you are ok, without needing to think about it too hard.
There was a problem hiding this comment.
Okay sounds good to me
There was a problem hiding this comment.
It looks like this is OK because this method will only be called by Hadoop M/R indexing, which won't set useUniquePath. It would be good to note that in a comment, so people don't get confused looking at this.
There was a problem hiding this comment.
I don't thinkHdfsDataSegmentPusher.getStorageDir() is just called for Hadoop M/R, but it's only called from HdfsDataSegmentPusher.push() (which can be used by anything pushing to HDFS as a deep storage). But HdfsDataSegmentPusher.push() will always set this to false since any 'uniqueness' will be applied not to the directory but to the filename along with the shard (IIRC this was because of excessive directories in HDFS causing performance issues)
There was a problem hiding this comment.
Ah, I meant it's only called directly by Hadoop M/R. Anyway this is great analysis - could you please include it as a comment?
There was a problem hiding this comment.
What happens if a deep storage pusher impl doesn't respect useUniquePath? This will likely happen for extensions whose authors don't notice the signature change here. It's probably acceptable if what happens is you just get the old behavior.
There was a problem hiding this comment.
Well, this is a little janky, but note that the signature didn't really change, but the meaning of that last boolean parameter did. At first I wanted to force a signature change so that implementors would have to acknowledge the change, but as discussed, decided to make it 'backward-compatible' for the point release. So implementors who don't notice the signature change will get 'replaceExisting=true' behavior for Kafka indexing tasks and 'replaceExisting=false' for all other task types, which seemed reasonable to me (since replaceExisting=true was added primarily for the Kafka indexing task type as well)
|
@dclim not sure how someone would read this
Can you please provide some guidance on how to find out those leaked segments and delete it manually? |
|
@b-slim right now, it would involve some scripting, something like: list out the segment directory and find any partition directories that have more than one child directory, and then compare these directory names to the druid_segments table and delete any directory that isn't referenced in the loadSpec of an entry in that table. You'd probably also want to check timestamps and only process dangling segments from sometime in the past so you don't inadvertently wipe segments that are pushed but not published. |
|
@dclim this seems pretty complicated to me, so imagine user whom just adopted Druid. |
|
@b-slim @dclim how about making one change in this patch: kill segments in the "Our segments really do exist, awaiting handoff" path. Then, tasks will clean up after themselves, and the only way 'loose' segments would be lying around is if tasks die before they can clean up. And this can already lead to loose segments even today (if a segment is pushed but not published, it stays in deep storage forever). So it'd be just as good as what we have today. |
|
@dclim and @gianm am not asking this as part of this patch, but is it possible to have a persisted pointer (maybe as part of segments allocations table) this pointer can be path to a directory based on transaction id that can be used to track the committed/aborted handoffs and clean any aborted segments afterward even if the task dies before cleaning after it self? |
|
@gianm changes made, thanks for the review guys. Good catch on killing segments at "Our segments really do exist, awaiting handoff", I meant to do that but missed it and only caught the exception case. @b-slim that seems reasonable to me; I think it makes sense though to wait and see if anyone complains about this; I think the cases that it will leave garbage are actually quite small, and not much higher than the current implementation where tasks overwrite each other's segments. |
gianm
left a comment
There was a problem hiding this comment.
@dclim The code looks good to me, but please update the insert-segment-to-db.md doc. It needs to make it clear that it has a risk of breaking the exactly-once guarantees, and the preferred method of migrating data is to migrate the metadata store dump as well.
|
@gianm added docs; also refactored common Finder code into a |
|
@dclim can you please add a short section about when the leak can occur and how to find leaked segments? Thanks |
There was a problem hiding this comment.
IMO, the implementations would be less convoluted if the helpers in this file were static utility methods rather than inherited superclass methods. I'm in the school of thought where inheritance is best used when nothing else really makes sense (neither composition nor utility methods).
In particular, I feel that the pattern in play here -- an abstract class implements a method from an interface and then creates a new method for its subclasses to implement -- is usually more difficult to understand than utility methods that can be called directly by the potential subclasses. It breaks the connection between the subclasses and the interface they implement, and requires the reader to understand the link established in the abstract class too.
I won't hold up the patch over this though.
|
@gianm since it is a Kafka indexing issue i think we can added to the new Kafka ingestion page maybe as |
|
I'm ok with a section in the docs, just wondering what you were thinking. If we add one I don't think it needs to be too alarmist, because with the cleanup-after-nonpublishing feature in this patch, there shouldn't be too many unused segment files lying around. Probably not more than already existed with Kafka indexing. |
|
@gianm yeah, as i said it is to help ppls operating the cluster/help me to know where to dig without reading the code base and maybe good to have it as issue that someone can tack if they want to. |
|
@gianm I agree with the base class making things more convoluted and refactored it; please check the last commit again when you get a chance |
|
@gianm @b-slim I took another look through the code, and with the change Gian suggested to remove segments if we discover someone already published it before us, I can't find any real scenario in which garbage would be left behind, other than if there are bugs in Druid's code, such as these sanity checks tripping in In these cases, if an exception was thrown after some segments had already been pushed, those segments would be orphaned. But - my point here is that this is not something related at all to the After looking at it more closely, I'm leaning towards the opinion that it would actually cause more harm than good to explicitly call it out in documentation since it would scare people unnecessarily. Folks who view this as a major issue should already have been running periodic consistency checks comparing deep storage to the metadata storage. I think it might be interesting to write a tool, sorta like a 'druid-fsck', that can validate the consistency of the deep storage and clean up orphans, but that would be for another time. |
|
Wait, I won't merge this, since there's a checkstyle error showing up in travis. @dclim could you please take a look? |
|
Travis is looking good; merging. |
|
thanks @gianm |
|
@dclim I tagged this 0.12.1 since I brought up on the dev list the prospect of including this there, and nobody argued. Does that sound reasonable to you & are you able to do a backport? |
|
@gianm yes that sounds good, will backport |
* support unique segment file paths * forbiddenapis * code review changes * code review changes * code review changes * checkstyle fix
* support unique segment file paths * forbiddenapis * code review changes * code review changes * code review changes * checkstyle fix
* support unique segment file paths * forbiddenapis * code review changes * code review changes * code review changes * checkstyle fix
#5187 was intended to fix an issue with Kafka indexing service where a task would push segments to deep storage and then fail, and then the subsequent retry task would attempt to push its segment (which contains a different set of offsets from the first task) and publish the metadata, but the push would not happen because the segment data already existed in deep storage. In this scenario, the metadata (most importantly the offset cursors) would not match what's actually in the segment and exactly-once semantics would be violated.
This was fixed by supporting file overwriting in deep storage, but it turns out that there are other cases where overwriting the existing segment is undesired behavior. As an example (which may or may not be happening in practice), there could be a task which pushes its segments and publishes metadata but then fails before it can notify the supervisor that it is done. If the supervisor then creates a retry task to attempt again, the task will eventually fail when the transactional metadata commit fails, but by this point it will have already pushed its version of the segments into deep storage, overwriting the good set of segments there. This will lead to a similar situation as the previous case, where the same segment ID could have different data on historicals and in deep storage, and where Kafka messages can be duplicated or missed.
The cleanest way to handle all these cases is to have each Kafka task write its segments to a separate location in deep storage so that the metadata published definitely matches the segment set and there's no interaction between tasks. If a task pushes and then the metadata publish fails, it will attempt to clean up the orphaned segments, but it is expected that in some unhandled exception cases, the segments may remain in deep storage and may need to be cleaned up manually.