Make IngestSegmentFirehoseFactory splittable for parallel ingestion#7048
Make IngestSegmentFirehoseFactory splittable for parallel ingestion#7048jihoonson merged 7 commits intoapache:masterfrom apollographql:glasser/ingest-segment-firehose-splittable
Conversation
|
I recognize that this is a PR without a matching issue, so it's more of a "proposal in the form of code" than a ready-to-merge pull request, and I will not feel like my time writing the code has been wasted if this change is unwanted. The PR currently has no tests; if the design of this feature is accepted, I'll write some. |
|
Note: there's a comment in the PR about how
This suggests that it would be helpful to have a |
|
@glasser thanks for working on this! I have one comment. As in other distributed systems like Hadoop or Spark, it's important to evenly distributed work in parallel indexing because the entire ingestion is completed when the longest task is finished. It means, each sub task is supposed to process almost same amount of data. |
|
Good question. I was kind of imagining you would set taskGranularity equal to your output segmentGranularity so that each subtask would write one segment. You're right that things can get unbalanced though. Are you imagining that the split implementation would query the segments metadata to learn all the segment sizes and the user would specify bytes per split? Would we try to not divide any input segments but just chunk them together? This seems like a reasonable option to desire but I kind of feel like people might still want to get started with the simpler "I know my peons can handle an hour of data, just split by hours" anyway... so implementing one of these options doesn't necessarily stop from implementing the other later. |
Yes, this is exactly what I want. The task can ask the coordinator to get the segment metadata. Tasks use {
"2019-02-11T23:00:00.000Z/2019-02-12T00:00:00.000Z": {
"size": 21459255,
"count": 2
},
"2019-02-11T22:00:00.000Z/2019-02-11T23:00:00.000Z": {
"size": 24510542,
"count": 2
}
}
My feeling is that If |
|
I guess my logic is we run Kafka indexing service making one segment per task so handling one segment per task feels doable and simple :) But of course KIS can be configured with max bytes as well. I like the point that we can have a default max bytes here. What are you imagining this would do for single segments larger than the max bytes setting? |
|
(er, single intervals, not single segments - I'm bad at terminology here.) |
|
For an interval larger than the max bytes setting, I think each subTask should process a subset of segments, so that the large interval can be processed in parallel. For a segment larger than the max bytes setting, probably it's fine to process with one subTask. |
|
OK. And I guess it doesn't really matter how segments are assigned between tasks — all the locking and segment allocation works no matter how much overlap there is, right? Since this will require the subtasks to understand how to have specific segments named rather than just intervals, perhaps the task itself should be extended to allow you to specify a list of segments as an alternative to an interval in the first place? |
Would you tell me more about what kind of overlap you're thinking? I think it's fine if each task processes a disjoint subset of input segments.
I think we can extend |
I was just stressing about what happens about combining the results of processing two segments from a given time chunk on different subtasks, but I think the answer is that everything just works fine. I have a few further questions about overlapping intervals. (a) It looks like the Alternatively, it seems like we could just use the same In either case it looks like the client in question isn't immediately accessible. I think I would need to add an injection for CoordinatorClient directly into IngestSegmentFirehoseFactory, but it looks like several services use IndexingServiceFirehoseModule without binding CoordinatorClient. Would I want to add a binding for CoordinatorClient directly to IndexingServiceFirehoseModule? Or if using SegmentListUsedAction I would have to make sure to call setToolbox in the supervisor task like it is currently called in the sub task. That seems simpler than using CoordinatorClient. (BTW I notice that when IndexTask calls setToolbox on its firehose factory, it knows how to recurse through CombiningFirehoseFactory, but the parallel sub task does not do that. I think I could add a TaskToolboxConsumingFirehoseFactory interface implemented by IngestSegmentFirehoseFactory and CombiningFirehoseFactory to simplify this and make it more consistent.) (b) I don't really know what happens when we have two segments with overlapping intervals and distinct versions. Eg v1 @ 1:00-3:00, v2 @ 2:00-4:00. Is the data from 1:00-2:00 in the first interval supposed to be visible, or is the entire segment overshadowed? If it's the former, then specifically, when IngestSegmentFirehoseFactory calls will the overshadowing still work if we're in a subtask where we're only passing the first segment in |
|
@glasser good questions! For (a), thank you for reminding me about For (b), the first segment should be partially overshadowed. The part of v1 @ 1:00-2:00 should be available for input. So, you may want to adjust |
|
Oh, I found it. In |
|
But doesn't that holder come from the timeline, so it only works if the timeline was constructed from the full set of segments? I think what this means is that This implies that instead of Alternatively, the split operation needs to provide both the full list of segments and the split-specific segment list to each split firehose factory. (Or at least include an extra list of overlapping segments.) (In other news, my TaskToolboxConsumingFirehoseFactory idea runs into trouble because CombiningFirehoseFactory is in druid-server which doesn't have access to druid-indexing-service's TaskToolbox type. I suppose the interface could declare |
|
Is your concern that different sub tasks can process overlapping segments of different versions, so that both of them produce data of overlapping intervals which can cause the incorrect result? I think it's valid. To avoid this, we can add another restriction in split generation: each split should contain all overlapping segments. For example, suppose we have 3 segments of v1 @ 1:00-3:00, v2 @ 2:00-4:00, and v2 @ 4:00-6:00. Then, we need 2 sub tasks each of which processes (v1 @ 1:00-3:00 and v2 @ 2:00-4:00) and (v2 @ 4:00-6:00), respectively. Each sub tasks should be able to generate the valid timeline from the given subset of sub tasks. I think this restriction is acceptable because this kind of overlapping won't be common. What do you think? |
|
That sounds reasonable, with this caveat: you previously said:
and I think what you meant by that is that if there are a bunch of segments with the same interval and version (different partition num) that it would be OK to split them up across sub tasks. So I think the algorithm would be something like: list the segments for the whole interval as a timeline. Select the first segment, and take the set of all segments that overlap it, transitively. If this set of segments has more than one interval, then all of those segments are constrained to go in the same subtask. Otherwise, each of the segments in this set (all of which are for the same interval) may go in their own subtask. We've now partitioned the full set of segments into subsets that have to go together. We can then divide the whole list up into subtasks. This is the https://en.wikipedia.org/wiki/Bin_packing_problem. Rather than try to optimally solve it or even use the first-fit problem, I would just use the greedy algorithm that goes down the list of subsets and greedily assigns them to subtasks, since this is more likely to get segments of the same interval onto the same subtask and thus lead to fewer output segments. |
|
@glasser sounds nice! Are you planning to update your PR? |
|
Yes, now that I think we have a concrete algorithm! I did just pull out the observation I had above about the special-case setToolbox code in IndexTask not existing in ParallelIndexSubTask into #7063 |
|
Thanks! I'll take a look it too. |
|
What do you think a good default value for maxInputSegmentBytesPerTask is? I was thinking of just copying what the Kafka indexing service uses for its default segment size limit, but that's a row count, not bytes. |
OK, I think this was overcomplicating it. An easier way to solve the overlapping segment problem is just to make the task specify directly to the subtask a The sub-task won't need to recalculate a timeline at all: it'll just set up some WindowedStorageAdapters to read the appropriate parts of the segments. |
|
I'll work on this once #7089 is merged. |
…7089) IndexTask had special-cased code to properly send a TaskToolbox to a IngestSegmentFirehoseFactory that's nested inside a CombiningFirehoseFactory, but ParallelIndexSubTask didn't. This change refactors IngestSegmentFirehoseFactory so that it doesn't need a TaskToolbox; it instead gets a CoordinatorClient and a SegmentLoaderFactory directly injected into it. This also refactors SegmentLoaderFactory so it doesn't depend on an injectable SegmentLoaderConfig, since its only method always replaces the preconfigured SegmentLoaderConfig anyway. This makes it possible to use SegmentLoaderFactory without setting druid.segmentCaches.locations to some dummy value. Another goal of this PR is to make it possible for IngestSegmentFirehoseFactory to list data segments outside of connect() --- specifically, to make it a FiniteFirehoseFactory which can query the coordinator in order to calculate its splits. See #7048. This also adds missing datasource name URL-encoding to an API used by CoordinatorBasedSegmentHandoffNotifier.
|
@jihoonson OK, I have a basic implementation here. No new unit tests and haven't gotten all existing tests passing yet, but if you have a chance to take a basic look at the design that would be helpful! Will try to finish up the tests in the next couple days, but am curious if there are any major changes I'd need to make. |
|
@glasser thanks! I'll take a look. |
There was a problem hiding this comment.
I pulled out this observation into its own PR: #7171
There was a problem hiding this comment.
I think we shouldn't document this because it's supposed to be used internally.
There was a problem hiding this comment.
OK, and document interval as required?
There was a problem hiding this comment.
Also, if the idea is to only use segments internally, I'll remove the support I had for splitting a task that has segments specified and only make the split work for interval tasks.
There was a problem hiding this comment.
Another concern is that, Druid is using ZooKeeper for assigning tasks which means the overlord writes task specs in some ZooKeeper nodes to assign them. SegmentIds is safer than DataSegments because there's a limit for max bytes of data written in a node.
There was a problem hiding this comment.
The goal of this class looks similar to TimelineObjectHolder. TimelineObjectHolder is to represent each non-overshadowed logical segment backed by a real segment which might be partially overshadowed. WindowedSegment is opposite: It represents a real segment and what intervals are not overshadowed. Can we reuse TimelineObjectHolder?
There was a problem hiding this comment.
I think that's a good idea. I still want to have WindowedSegmentId (maybe with a different name) as TimelineObjectHolder isn't serializable, but I could reduce the delta of this PR by using TimelineObjectHolder instead of WindowedSegment.
The only trickiness is that the only code Druid has to construct List<TimelineObjectHolder> is VersionedIntervalTimeline, and as described above this isn't appropriate for a sub-task which only has a few segment IDs. So I have to construct it by hand, including making PartitionChunks and such. But that's probably pretty straightforward.
|
Thanks @jihoonson. It's much simpler using It's probably worth another review pass now, though I still need to do unit tests. I'll wait to see what you think of #7171 before making the integration tests work. |
|
Note that I'm holding off on cleaning up the integration tests here until #7171 is merged. I should write unit tests (eg of the splitting algorithm) though. |
There was a problem hiding this comment.
I think it's fine to add this class even though it can also be replaced with TimelineObjectHolder because it needs to make TimelineObjectHolder JSON-serializable which needs another big refactoring.
However, I'm not sure why it has a different form of information from TimelineObjectHolder. How about changing WindowedSegmentId to be same with it like representing an ID of a logical segment? Then the conversion logic from WindowedSegmentId to TimelineObjectHolder would be simpler.
There was a problem hiding this comment.
The goal here is to ensure that we only send a given segment to one subtask, even if it has multiple windows. So it seemed reasonable to at least represent it in memory as an object that is "a segment with some windows". Does that seem right?
It also seemed nice to keep the representation that shows up in the task spec simpler (just segment ids, not full DataSegment) though maybe that is the wrong tradeoff. I was especially leaning towards that when the idea was that you might manually write a spec with segment IDs.
- Get rid of WindowedSegment - Don't document 'segments' parameter or support splitting firehoses that use it - Require 'intervals' in WindowedSegmentId (since it won't be written by hand)
I'd like to leave this PR in a potentially mergeable state, but I still would appreciate reviewer eyes on the questions I'm removing here.
|
I've added a unit test that I feel pretty good about, which takes advantage of the existing useful test cases in IngestSegmentFirehoseFactoryTimelineTest. I have two questions about the impact on CompactionTask from this change that I had added as FIXME comments earlier. The last commit now removes those comments (so that the PR could be in theoretically-mergeable state) but I'd still appreciate feedback on those questions. |
| while (firehose.hasMore()) { | ||
| final InputRow row = firehose.nextRow(); | ||
| count++; | ||
| sum += row.getMetric(METRICS[0]).longValue(); |
There was a problem hiding this comment.
TeamCity complains that both method calls here may cause NPEs. It is just a duplicate of existing code with the same problem in testSimple, though.
| .stream() | ||
| .filter(s -> s.getId().toString().equals(segmentId)) | ||
| .findAny() | ||
| .get(); // throwing if not found is exactly what the real code does |
There was a problem hiding this comment.
TeamCity doesn't want me to call Optional.get() without ifPresent() because it might throw, but I'm OK with it throwing. Not sure if I should change anything?
|
Most of the TeamCity errors are in untouched code. Two of them are in new code from me but I'm not sure what changes would be useful (see my comments). |
|
It was probably because of the changed Intellij version which is reverted back now. I restarted TemCity. |
|
Good, TeamCity passed this time. |
| This Firehose can be used to read the data from existing druid segments. | ||
| It can be used ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. | ||
| It can be used to ingest existing druid segments using a new schema and change the name, dimensions, metrics, rollup, etc. of the segment. | ||
| This firehose is _splittable_ and can be used by [native parallel index tasks](./native_tasks.html#parallel-index-task). |
There was a problem hiding this comment.
Would you please add IngestSegmentFirehose to https://github.com/apache/incubator-druid/blob/master/docs/content/ingestion/native_tasks.md#parallel-index-task too?
| private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; | ||
| private final String dataSource; | ||
| private final Interval interval; | ||
| private final List<WindowedSegmentId> segmentIds; |
There was a problem hiding this comment.
Would you please add @Nullable for interval and segmentIds and a comment about when they can be null?
| private final List<String> dimensions; | ||
| private final List<String> metrics; | ||
| private final long maxInputSegmentBytesPerTask; | ||
| private List<InputSplit<List<WindowedSegmentId>>> splits; |
There was a problem hiding this comment.
nit: it would be more clear to read codes if final variables and non-finals are separated.
| @JsonProperty("interval") Interval interval, | ||
| // Specifying "segments" is intended only for when this FirehoseFactory has split itself, | ||
| // not for direct end user use. | ||
| @JsonProperty("segments") List<WindowedSegmentId> segmentIds, |
There was a problem hiding this comment.
Would you please add @Nullable for interval and segmentIds?
| public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); | ||
| private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; |
There was a problem hiding this comment.
Wondering how you chose this default value. Do you have any rationale?
There was a problem hiding this comment.
You know, I swear I did when I wrote this, but I can't remember now and I clearly didn't write it down. Do you have any suggestions?
The default maxRowsPerSegment for Kafka indexing seems like a reasonable place to look at to start, but then one has to think about how many bytes are in a typical row and how many segments we'd like each task to produce. My default here is probably too low?
There was a problem hiding this comment.
This looks quite small, but I'm fine by raising it in a follow-up PR.
| "type": "ingestSegment", | ||
| "dataSource": "%%DATASOURCE%%", | ||
| "interval": "2013-08-31/2013-09-02", | ||
| "maxInputSegmentBytesPerTask": 1 |
There was a problem hiding this comment.
Did you set this to 1 intentionally?
There was a problem hiding this comment.
Yes: that's what ensures that the test uses multiple subtasks.
| currentSplit = new ArrayList<>(); | ||
| bytesInCurrentSplit = 0; | ||
| } | ||
| if (segmentBytes > maxInputSegmentBytesPerTask) { |
There was a problem hiding this comment.
Hmm, this might not be what we want. The idea of size-based split is to process roughly the same amount of data in each subTask. How ingestSegmentFirehose actually works is, 1) per segment, it reads the timestamp column and 2) reads other columns if the timestamp falls in the given interval (QueryableIndexStorageAdapter.CursorSequenceBuilder).
But, this if clause compares the entire segment size with maxInputSegmentBytesPerTask even if the interval covers only a part of a segment. I guess we may need to estimate the segment size to be read roughly by computing based on the ratio of the window interval to the whole interval of a segment?
There was a problem hiding this comment.
I'm happy to implement this today, but as a couple counterpoints before I do so:
-
Size-based split does help keep subtasks equal, but it also has the purpose of bounding the amount of disk space needed for each task (because the current implementation downloads all segments at once at the beginning of the task), at least as long as all individual segments fall below the boundary. It would be a shame if a single task ended up requiring an enormous amount of disk space (and download time) if it ended up being assigned a large number of segments which only contribute a small fraction of their data to the output segment.
-
In practice I expect that most people's segments have uniform granularity (or at least are non-overlapping, eg older data having uniform day granularity and newer data having uniform hour granularity) and so the only time that windows will not be the full segment size is at the beginning and end of the total interval. And even having misalignment there will be a mistake if you're using this to re-ingest data into itself with
appendToSegments: falsesince it will drop data in the start and end segments that are outside of the interval. So I think windowing is not going to be a super common use case in practice, and so doing a performance optimization that only matters for the windowing use case might not be worth the extra code complexity and more complicated documentation needed formaxInputSegmentBytesPerTask. (Of course the overlapping segment use case needs to be supported for correctness because that's how the Druid data model works. But that doesn't mean it needs performance optimizations?)
There was a problem hiding this comment.
In practice I expect that most people's segments have uniform granularity (or at least are non-overlapping, eg older data having uniform day granularity and newer data having uniform hour granularity) and so the only time that windows will not be the full segment size is at the beginning and end of the total interval. And even having misalignment there will be a mistake if you're using this to re-ingest data into itself with appendToSegments: false since it will drop data in the start and end segments that are outside of the interval. So I think windowing is not going to be a super common use case in practice, and so doing a performance optimization that only matters for the windowing use case might not be worth the extra code complexity and more complicated documentation needed for maxInputSegmentBytesPerTask. (Of course the overlapping segment use case needs to be supported for correctness because that's how the Druid data model works. But that doesn't mean it needs performance optimizations?)
If I understand correctly, windowing would happen depending on the value of maxInputSegmentBytesPerTask rather than segment granularity because split is created whenever bytesInCurrentSplit + segmentBytes is larger than maxInputSegmentBytesPerTask. Am I missing something?
Size-based split does help keep subtasks equal, but it also has the purpose of bounding the amount of disk space needed for each task (because the current implementation downloads all segments at once at the beginning of the task), at least as long as all individual segments fall below the boundary. It would be a shame if a single task ended up requiring an enormous amount of disk space (and download time) if it ended up being assigned a large number of segments which only contribute a small fraction of their data to the output segment.
Good point. I think it would be an issue, but maybe it would be better to handle it in IngestSegmentFirehoseFactory like lazily downloading segments whenever needed (probably worth to combine with PrefetchableTextFilesFirehoseFactory?). But, this should be done in a separate PR and I'm fine with going as it is. Would you please add a comment about this concern?
There was a problem hiding this comment.
If I understand correctly, windowing would happen depending on the value of maxInputSegmentBytesPerTask rather than segment granularity because split is created whenever bytesInCurrentSplit + segmentBytes is larger than maxInputSegmentBytesPerTask. Am I missing something?
Individual input segments are never split up over multiple tasks. The only times that the window on a segment isn't that segment's full interval are when the segment goes outside of the overall task's boundaries or if part of the segment is overshadowed. So my point in that paragraph was that in practice, the ratio you're describing will generally be 100%, and I'm not sure the added complexity (including making the meaning of maxInputSegmentBytesPerTask more complex) are worth it.
I will add a comment about lazily downloading segments. I suppose there's something positive about failing fast if segments can't be found, but that's probably not that important.
glasser
left a comment
There was a problem hiding this comment.
Thanks, I made the other changes you suggested.
| public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); | ||
| private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; |
There was a problem hiding this comment.
You know, I swear I did when I wrote this, but I can't remember now and I clearly didn't write it down. Do you have any suggestions?
The default maxRowsPerSegment for Kafka indexing seems like a reasonable place to look at to start, but then one has to think about how many bytes are in a typical row and how many segments we'd like each task to produce. My default here is probably too low?
| "type": "ingestSegment", | ||
| "dataSource": "%%DATASOURCE%%", | ||
| "interval": "2013-08-31/2013-09-02", | ||
| "maxInputSegmentBytesPerTask": 1 |
There was a problem hiding this comment.
Yes: that's what ensures that the test uses multiple subtasks.
| public class IngestSegmentFirehoseFactory implements FiniteFirehoseFactory<InputRowParser, List<WindowedSegmentId>> | ||
| { | ||
| private static final EmittingLogger log = new EmittingLogger(IngestSegmentFirehoseFactory.class); | ||
| private static final long DEFAULT_MAX_INPUT_SEGMENT_BYTES_PER_TASK = 150 * 1024 * 1024; |
There was a problem hiding this comment.
This looks quite small, but I'm fine by raising it in a follow-up PR.
| currentSplit = new ArrayList<>(); | ||
| bytesInCurrentSplit = 0; | ||
| } | ||
| if (segmentBytes > maxInputSegmentBytesPerTask) { |
No description provided.