Affected Version
master
Description
When trying to resolve an attribute via a lookup at ingestion time I get the following error:
2019-05-21T21:06:54,247 ERROR [KafkaSupervisor-example] org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor - SeekableStreamSupervisor[example] failed to handle notice: {class=org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, exceptionType=class org.apache.druid.java.util.common.RE, exceptionMessage=function 'lookup' is not defined., noticeClass=RunNotice}
org.apache.druid.java.util.common.RE: function 'lookup' is not defined.
at org.apache.druid.math.expr.ExprListenerImpl.exitFunctionExpr(ExprListenerImpl.java:303) ~[druid-core-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.math.expr.antlr.ExprParser$FunctionExprContext.exitRule(ExprParser.java:212) ~[druid-core-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.antlr.v4.runtime.tree.ParseTreeWalker.exitRule(ParseTreeWalker.java:71) ~[antlr4-runtime-4.5.1.jar:4.5.1]
at org.antlr.v4.runtime.tree.ParseTreeWalker.walk(ParseTreeWalker.java:54) ~[antlr4-runtime-4.5.1.jar:4.5.1]
at org.apache.druid.math.expr.Parser.parse(Parser.java:85) ~[druid-core-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.math.expr.Parser.parse(Parser.java:72) ~[druid-core-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.transform.ExpressionTransform.getRowFunction(ExpressionTransform.java:68) ~[druid-processing-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.transform.Transformer.<init>(Transformer.java:50) ~[druid-processing-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.transform.TransformSpec.toTransformer(TransformSpec.java:122) ~[druid-processing-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.transform.TransformingStringInputRowParser.<init>(TransformingStringInputRowParser.java:44) ~[druid-processing-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.transform.TransformSpec.decorate(TransformSpec.java:108) ~[druid-processing-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.segment.indexing.DataSchema.getParser(DataSchema.java:125) ~[druid-server-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.<init>(SeekableStreamIndexTask.java:102) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.kafka.KafkaIndexTask.<init>(KafkaIndexTask.java:70) ~[?:?]
at org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor.createIndexTasks(KafkaSupervisor.java:246) ~[?:?]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.createTasksForGroup(SeekableStreamSupervisor.java:2492) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.createNewTasks(SeekableStreamSupervisor.java:2306) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.runInternal(SeekableStreamSupervisor.java:1012) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor$RunNotice.handle(SeekableStreamSupervisor.java:264) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.lambda$tryInit$3(SeekableStreamSupervisor.java:723) ~[druid-indexing-service-0.15.0-incubating-SNAPSHOT.jar:0.15.0-incubating-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_181]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_181]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_181]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_181]
Here is the ingestion spec:
{
"type": "kafka",
"dataSchema": {
"dataSource": "example",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"flattenSpec": {
"fields": [
{
"type": "path",
"name": "item_id",
"expr": "$.parameters.item_id"
}
]
},
"dimensionsSpec": {
"dimensions": [
"item_id",
"item_lang"
],
"dimensionExclusions": []
}
}
},
"metricsSpec": [
{
"type": "count",
"name": "count"
}
],
"transformSpec": {
"transforms": [ {
"type": "expression",
"name": "item_lang",
"expression": "lookup(item_id, 'item-lang-lookup')"
}]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"rollup": true
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 2000000
},
"ioConfig": {
"topic": "topic",
"consumerProperties": {
"bootstrap.servers": "kafka:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"useEarliestOffset": true
}
}
Affected Version
master
Description
When trying to resolve an attribute via a lookup at ingestion time I get the following error:
Here is the ingestion spec:
{ "type": "kafka", "dataSchema": { "dataSource": "example", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "timestamp", "format": "posix" }, "flattenSpec": { "fields": [ { "type": "path", "name": "item_id", "expr": "$.parameters.item_id" } ] }, "dimensionsSpec": { "dimensions": [ "item_id", "item_lang" ], "dimensionExclusions": [] } } }, "metricsSpec": [ { "type": "count", "name": "count" } ], "transformSpec": { "transforms": [ { "type": "expression", "name": "item_lang", "expression": "lookup(item_id, 'item-lang-lookup')" }] }, "granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "rollup": true } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 2000000 }, "ioConfig": { "topic": "topic", "consumerProperties": { "bootstrap.servers": "kafka:9092" }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H", "useEarliestOffset": true } }