Skip to content

Add DruidInputSource (replacement for IngestSegmentFirehose)#8982

Merged
jon-wei merged 13 commits intoapache:masterfrom
jon-wei:druidinputsource2
Dec 6, 2019
Merged

Add DruidInputSource (replacement for IngestSegmentFirehose)#8982
jon-wei merged 13 commits intoapache:masterfrom
jon-wei:druidinputsource2

Conversation

@jon-wei
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei commented Dec 3, 2019

Based on an initial patch by @jihoonson

This PR adds DruidInputSource and associated classes, a new replacement for IngestSegmentFirehose. This input source always has a fixed input format (DruidInputFormat). The row reading class is DruidSegmentReader and the input entity is DruidInputEntity.

The new input source has the same parameters as the existing IngestSegmentFirehose, and has similar code.

Additionally:

  • The PR adjusts the CompactionTask so that it uses DruidInputSource instead of the old firehose.
  • Fixes an issue where the value of inputSource.needsFormat() was not being respected.
  • The new input source avoids the issue described in ingestSegment firehose is not friendly to the sampler #8448, which is present in the old firehose

For testing, this patch relies on the existing CompactionTask tests for checking functionality of the new input source.

A test where an IngestSegmentFirehose is used for reindexing with a native batch task (non-parallel) has also been added to CompactionRunTaskTest. The existing ITParallelIndexTaskTest still uses a firehose instead of input source.

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths.
  • added integration tests.
  • been tested in a test Druid cluster.

@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Dec 3, 2019

Marking WIP, need to update the docs, and will add an integration test to ITParallelIndexTaskTest that uses the new input source.

@vogievetsky
Copy link
Copy Markdown
Contributor

Tested this (as a user) from the console (via #8828 ) and it works

@jon-wei jon-wei removed the WIP label Dec 4, 2019
@jon-wei
Copy link
Copy Markdown
Contributor Author

jon-wei commented Dec 4, 2019

Added a doc entry for the new input source. We'll need a larger rework of the docs for the new input sources/input formats, but that should be handled in another PR.

Adjusted ITParallelIndexTaskTest so that it also runs reingestion using DruidInputSource

{
"type": "druid",
"dataSource": "wikipedia",
"interval": "2013-01-01/2013-01-02"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It would be nice if either this example or a 2nd example included dimensions, metrics, and filter

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a second example with more explanation of what the example specs do

Comment thread docs/ingestion/native-batch.md Outdated
|property|description|required?|
|--------|-----------|---------|
|type|This should be "druid".|yes|
|dataSource|A String defining the data source to fetch rows from, very similar to a table in a relational database|yes|
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this part 'very similar to a table in a relational database' feels sort of out of place to me

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reworded this part and other parts of this table

),
source.getIntervalFilter()
);
final Sequence<Map<String, Object>> sequence = Sequences.concat(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is pretty beastly to follow and I think could definitely stand to be broken down into a couple of methods. I suggest one that makes a Sequence from a Cursor, and one that makes a CloseableIterator from a Sequence. Then this part can be simplified to something like:

    final Sequence<Cursor> cursors = storageAdapter.getAdapter().makeCursors(
        Filters.toFilter(dimFilter),
        storageAdapter.getInterval(),
        VirtualColumns.EMPTY,
        Granularities.ALL,
        false,
        null
    );

    final Sequence<Map<String, Object>> sequence = Sequences.concat(Sequences.map(cursors, this::cursorToSequence));
    return sequenceToCloseableIterator(segmentFile, sequence);

which makes this much more approachable imo

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adjusted this to use the suggested structure

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactoring!

public void close()
{
if (!segmentFile.delete()) {
// log
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this a placeholder for a log message?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Filled in a warning log message

Interval lastInterval = null;
for (Interval interval : timeline.keySet()) {
if (lastInterval != null) {
if (interval.overlaps(lastInterval)) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i know this isn't really new code, but these if statements can be collapsed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collapsed the ifs

// segments to olders.

// timelineSegments are sorted in order of interval
int[] index = {0};
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm... so ugly, though I guess better than AtomicInteger 😢. This should be final at least I think? Alternatively, should this just be implemented how getUniqueDimensions is so that they are more consistent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried changing it to be the same as the dimensions version, but because there is only a presence check for the metrics version (no "exclusions" like with dimensions), the map.put becomes a style failure (tells me to use computeIfAbsent instead)

Made this final

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be MutableInt, but I think the original code is ok since it's just reusing the existing code.

{
Preconditions.checkNotNull(interval);

// This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider this a non-blocking discussion point, but should this maybe switch to using RetryUtils?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this for now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this has its own retry logic too.. I think it's worth to consider it for future refactoring.

}

@VisibleForTesting
private static List<String> getUniqueDimensions(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Would this method and getUniqueMetrics maybe more appropriately live in VersionedIntervalTimeline?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not seem to be covered by any of the tests in org.apache.druid.indexing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved these to VersionedIntervalTimeline, IngestSegmentFirehoseFactory.testGetUniqueDimensionsAndMetrics should be covering these now

}

@VisibleForTesting
private static List<String> getUniqueDimensions(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not seem to be covered by any of the tests in org.apache.druid.indexing

}

@VisibleForTesting
private static List<String> getUniqueMetrics(List<TimelineObjectHolder<String, DataSegment>> timelineSegments)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method does not seem to be covered by any of the tests in org.apache.druid.indexing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved these to VersionedIntervalTimeline, IngestSegmentFirehoseFactory.testGetUniqueDimensionsAndMetrics should be covering these now

Comment on lines +157 to +161
List<String> dimVals = new ArrayList<>(valsSize);
for (int i = 0; i < valsSize; ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part does not seem to be covered by any of the tests in org.apache.druid.indexing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adjusted CompactionRunTaskTest so that there's a multivalued dimension (this case handles that), and added a row correctness check to testRun there

Comment on lines +338 to +341
// This segment won't fit in the current non-empty split, so this split is done.
splits.add(new InputSplit<>(currentSplit));
currentSplit = new ArrayList<>();
bytesInCurrentSplit = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be worth having a unit test for this method. Looks like this part is not covered by any of the tests in org.apache.druid.indexing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added testDruidInputSourceCreateSplitsWithIndividualSplits to CompactionTaskParallelRunTest

Copy link
Copy Markdown
Contributor

@ccaominh ccaominh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

Comment thread docs/ingestion/native-batch.md Outdated
|dataSource|A String defining the Druid datasource to fetch rows from|yes|
|interval|A String representing the ISO-8601 interval, which defines the time range to fetch the data over.|yes|
|dimensions|A list of Strings containing the names of dimension columns to select from the Druid datasource. If the list is empty, no dimensions are returned. If null, all dimensions are returned. |no|
|metrics|The list of Strings containg the names of metric columns to select. If the list is empty, no metrics are returned. If null, all metrics are returned.|no|
Copy link
Copy Markdown
Contributor

@ccaominh ccaominh Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: containg -> containing

(travis failed on docs spellcheck)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed spelling

}

@VisibleForTesting
public static List<String> getUniqueDimensions(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The place of these methods seems a bit unintuitive to me. Their first parameter is a list of TimelineObjectHolders because I think it's easier to use in DruidInputSource and IngestSegmentFirehose. But, in fact, a list of DataSegments would be enough. Is there any other place that needs to use these methods other than DruidInputSource or IngestSegmentFirehose? If not, I guess a new util class such as CompactionUtils or DataSegments might be better?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved these to a new class ReingestionTimelineUtils and added some javadocs

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing comments within the method mention ordering based on recency of segments, so I kept the input as TimelineObjectHolders

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, so I had suggested moving them to VersionedIntervalTimeline because they seem like generic timeline operations, not specific to re-ingesting segments. Is having a ReingestionTimelineUtils better than their original home if they are still going to live somewhere specific to re-ingesting segments?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, my only view on this is it should be somewhere other than IngestSegmentFirehoseFactory since that's headed for eventual deprecation.

It could go in VersionedIntervalTimeline although that class is a bit weird (it accepts generic types but also has methods that specifically use DataSegment). I feel like the utils class is fine for now

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My stance is similar to @jon-wei. It could be used for more general use cases, but I don't see them now. Maybe we can move them from ReingestionTimelineUtils to somewhere else if we need in the future.

// segments to olders.

// timelineSegments are sorted in order of interval
int[] index = {0};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could be MutableInt, but I think the original code is ok since it's just reusing the existing code.

Copy link
Copy Markdown
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jon-wei thank you for making this as a PR!

{
Preconditions.checkNotNull(interval);

// This call used to use the TaskActionClient, so for compatibility we use the same retry configuration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why this has its own retry logic too.. I think it's worth to consider it for future refactoring.

}

@Override
public void remove()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the Iterator interface has the default implementation for remove() throwing UnsupportedOperationException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the redundant override

),
source.getIntervalFilter()
);
final Sequence<Map<String, Object>> sequence = Sequences.concat(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice refactoring!

Copy link
Copy Markdown
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@jon-wei jon-wei merged commit c949a25 into apache:master Dec 6, 2019
@jon-wei jon-wei added this to the 0.17.0 milestone Dec 17, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants