Introduce "transformSpec" at ingest-time.#4890
Conversation
It accepts a "filter" (standard query filter object) and "transforms" (a list of objects with "name" and "expression"). These can be used to do filtering and single-row transforms without need for a separate data processing job. The "expression" fields use the same expression language as other expression-based feature.
|
@gianm can you go in a little deeper on the motivation here? Previously the druid stuff was solely dedicated to indexing. Is the purpose here to add in more advanced ETL because people don't want to run more than one type of "worker"? or is it because there's common very primitive ETLs that will help 85% of use cases you see? As an alternative, what if we had more "indexing" extensions that plugged into different data writers or data syncs of different systems. Like a spark writer / hadoop output format. Or stuff specialized for certain other data processing frameworks? |
|
@drcrallen The motivation is to add some very basic stateless transforms to the Druid indexer to save people from the cost and hassle of running extra jobs when they want to do something basic. It's the same idea as the flattenSpec that exists for JSON and Avro. Some examples of problems that this solves:
For people that are just trying to plug Kafka streams into Druid and don't have stream processing infrastructure set up, this capability is huge. (A lot of people have pretty bare bones setups) Even for people that do have a stream processor set up, this approach is cheaper, since it doesn't require setting aside capacity in Kafka for processing and retention of the transformed topic. |
|
👍 |
|
@drcrallen Does that rationale seem reasonable to you? |
| if (valueMatcher != null) { | ||
| rowSupplierForValueMatcher.set(transformedRow); | ||
| if (!valueMatcher.matches()) { | ||
| return null; |
There was a problem hiding this comment.
most indexers don't have the ability to filter null rows but rather fail with NPE , does this filtering work with either hadoop indexing job or kafka indexing service ?
There was a problem hiding this comment.
@himanshug that's a good point, I tested it with IndexTask but not every form. IndexTask works since it has this code:
// The null inputRow means the caller must skip this row.
if (inputRow == null) {
continue;
}
I'll add some tests for kafka, hadoop, and realtime tasks as well.
| if (transforms.isEmpty()) { | ||
| transformedRow = row; | ||
| } else { | ||
| transformedRow = new TransformedInputRow(row, transforms); |
There was a problem hiding this comment.
also, can you adjust https://github.com/druid-io/druid/blob/master/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceRecordReader.java#L145 for hadoop re-indexing where rows don't go through the configured parser.
There was a problem hiding this comment.
Hmm, I think IngestSegmentFirehoseFactory doesn't work right either, it also bypasses the parser. Will try to do something to make this work for both of them and add some tests.
There was a problem hiding this comment.
yeah its even better to handle this in IngestSegmentFirehoseFactory then DatasourceRecordReader will get it automatically.
|
@drcrallen @gianm sounds good for a feature, i have also seen the needs for simple transformations at ingestion time. |
|
@himanshug just undocumented because nothing expression-related is documented yet. I'm intending to do another patch before too long that will add documentation for all of them, linking through to the existing Also for the reason in the original comment -- there is not really a clear place to put docs for stuff in dataSchema, since the docs probably need a refactor. |
|
Marking WIP until I can add more tests. |
- Add nullable annotation to Firehose.nextRow. - Add tests for index task, realtime task, kafka task, hadoop mapper, and ingestSegment firehose.
b44e6df to
7da33a1
Compare
|
@himanshug I would say a transform can be used to add columns (as long as those columns are functions of existing ones…). And I guess you could use it to sort of "remove" a column in a sense, by overwriting it with one that is just all nulls? Anyway, I just pushed a new javadoc that is more clear. I also took your suggestion in #4890 (comment) and added the sanity check. |
|
@gianm dint reallize all the possibilties, so that doc helps. thanks. |
- Uses the technique from apache#4883 on DimFilterHavingSpec too. - Also uses Transformers from apache#4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes apache#4957.
* Fix havingSpec on complex aggregators. - Uses the technique from #4883 on DimFilterHavingSpec too. - Also uses Transformers from #4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes #4957. * Remove unused import.
* Introduce "transformSpec" at ingest-time. It accepts a "filter" (standard query filter object) and "transforms" (a list of objects with "name" and "expression"). These can be used to do filtering and single-row transforms without need for a separate data processing job. The "expression" fields use the same expression language as other expression-based feature.
* Introduce "transformSpec" at ingest-time. It accepts a "filter" (standard query filter object) and "transforms" (a list of objects with "name" and "expression"). These can be used to do filtering and single-row transforms without need for a separate data processing job. The "expression" fields use the same expression language as other expression-based feature.
* Fix havingSpec on complex aggregators. - Uses the technique from apache#4883 on DimFilterHavingSpec too. - Also uses Transformers from apache#4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes apache#4957. * Remove unused import.
* Introduce "transformSpec" at ingest-time. It accepts a "filter" (standard query filter object) and "transforms" (a list of objects with "name" and "expression"). These can be used to do filtering and single-row transforms without need for a separate data processing job. The "expression" fields use the same expression language as other expression-based feature.
* Fix havingSpec on complex aggregators. - Uses the technique from apache#4883 on DimFilterHavingSpec too. - Also uses Transformers from apache#4890, necessitating a move of that and other related classes from druid-server to druid-processing. They probably make more sense there anyway. - Adds a SQL query test. Fixes apache#4957. * Remove unused import.
It accepts a "filter" (standard query filter object) and "transforms" (a
list of objects with "name" and "expression"). These can be used to do
filtering and single-row transforms without need for a separate data
processing job.
The "expression" fields use the same expression language as other
expression-based feature.
Not yet documented for two reasons:
I'm hoping to address both of these in a later patch, but for now, the docs are unchanged.