Feature for hadoop batch re-ingesion and delta ingestion#1374
Feature for hadoop batch re-ingesion and delta ingestion#1374drcrallen merged 7 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
How would you feel about having a DataSegmentInputSplit as per https://github.com/druid-io/druid/pull/1351/files#diff-e3e429f5bc3e9ee67f25b69b7e6c3969R981
?
Alternatively we could have a "HadoopDataSegment" which has a hadoopy loadSpec which is only a URI parsable by hadoop (in addition to the other things a DataSegment usually has. That could allow the peon/overlord to setup stuff correctly for hadoop.
We have the data to get all the splits done completely, it is just a matter of ensuring the data gets propagated to Hadoop correctly.
There was a problem hiding this comment.
I think your DataSegmentSplit and my DatasourceInputSplit are pretty much the same (and reusable) except I'm just keeping "path" in it while you are keeping whole DataSegment inside it. Do you really need all of DataSegment, can we just keep "loadSpec" in the input split, that information should be enough to load the segment.
There was a problem hiding this comment.
It looks like we are adjusting things to make the Mapper understand more different types of objects, I think it might be cleaner to define the Mapper interface in terms of InputRow objects and move all of the parser and stuff to the InputFormat.
This would have the downside of making it a bit more difficult to take other InputFormats, but would have the plus of getting rid of the parser when the parser is not needed (it becomes a part of the InputFormat instead of the mapper).
There was a problem hiding this comment.
If Mapper takes InputRow, then the row might contain a complex metric object such as HyperLogLogCollector. Then mapper needs to know how to serialize it.
On the other hand, If we introduce
HadoopWritableInputRow and enforce mapper to only take those (not done now to keep this feature PR simple and decided to put the if/else) then complete ownership of reading/parsing data goes to the InputFormat(which should understand the input data best) and mapper is assured to not get into serde issues.
|
The high-level direction seems good. The biggest question in my mind is do we keep the Writable InputRow or do we adjust the jobs to pull out the key/value and all that now? |
914b965 to
bbe682f
Compare
|
@himanshug @cheddar hey guys just wondering what the status of this is |
|
@himanshug #1472 is merged |
|
@fjy thanks, I will update this one soon, just doing some testing of the code to make sure that it works. |
bbe682f to
12222cf
Compare
|
👍 after latest changes |
There was a problem hiding this comment.
Very minor nitpick: but this could be a Closer instead of a list of queryableIndex.
I'm fine with it as is, but for future reference a closer would make it more obvious what's happening here.
There was a problem hiding this comment.
hmmm, for now, let us keep it as is.
|
@himanshug I do have a concern about concurrency here, if I use delta ingestion but do not have a lock on the interval, then whatever I ingest may or may not be actually representative of the latest data. A simple example is if I run delta ingestion twice at the same time for different input paths to add. (or start a second one before the first has finished). The indexing service hadoop indexer should be handling locks correctly, and I think such a race condition should not be able to proceed when the indexing service is used. But the standalone hadoop indexer could very easily encounter a race. But since we in general do not support accounting for race conditions in the stand alone stuff, that is just how its going to be. Does that sound correct? |
|
@drcrallen that sounds right to me. The stance on standalone stuff (hadoop & realtime) has always been that you need to think about consistency and concurrency yourself, since Druid only does meaningful locking when you use the indexing service. |
|
@drcrallen wrt concurrency, that is true. But, this PR is not introducing/changing that behavior. The issue you described with standalone hadoop indexer is there even today and not related to this PR really. |
|
@himanshug / @gianm Cool, I'm good with maintaining existing behavior, and just wanted to make sure our expectations are documented in case anyone ever comes back to this PR's comment thread. |
…ecomes reusable Conflicts: indexing-service/src/main/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactory.java
Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java indexing-service/pom.xml
… we can grab data from multiple places in same ingestion Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/HadoopDruidIndexerConfig.java indexing-hadoop/src/main/java/io/druid/indexer/JobHelper.java Conflicts: indexing-hadoop/src/main/java/io/druid/indexer/path/PathSpec.java
…xer uses overlord action to get list of segments and passes when running as an overlord task. and, uses metadata store directly when running as standalone hadoop indexer also, serialized list of segments is passed to DatasourcePathSpec so that hadoop classloader issues do not creep up
f3e2c61 to
cfd81bf
Compare
|
@gianm @drcrallen @nishantmonu51 |
There was a problem hiding this comment.
Convention in some other places is to check for null and set as ImmutalbeList.of() if the argument is null. Would that be appropriate to use here? it seems most of the logic will throw errors if this is not set to a non-empty list.
There was a problem hiding this comment.
no, it is valid for segments to not be specified, the check is done as the first thing inside addInputPaths(..). it is expected to be set by then.
|
I'm 👍 if we can get one other committer to agree with the comment in #1374 (comment) |
|
👍 at a3bab5b |
Feature for hadoop batch re-ingesion and delta ingestion
This PR implements following features.
We implement a "DatasourceInputFormat" and "DatasourcePathSpec" that can be used to batch re-ingest data back to druid from existing Datasource (something like IngestSegmentFirehose) segments.
Also, provided changes to allow specifying multiple PathSpecs (by using "MultiplePathSpec" ) in the batch ingestion so that you can combine both DatasourcePathSpec and StaticPathSpec (or any other) in the same ingestion to add late arriving data to existing ingested interval (aka "Delta" Ingestion)
original proposal discussion: https://groups.google.com/forum/?utm_medium=email&utm_source=footer#!msg/druid-development/EXYAPcV6pk4/Nu1uRGKEctAJ