Affected Version
0.23.0+
Description
The KafkaInputFormat added in #11630 allows extracting additional data from Kafka messages to ingest into Druid. However, it has a problem with the Druid sampler API, exploding with:
2023-03-03T07:36:02,271 ERROR [qtp1379563521-109] org.apache.druid.indexing.overlord.sampler.SamplerExceptionMapper - Failed to sample data: org.apache.druid.data.input.kafka.KafkaRecordEntity cannot be cast to org.apache.druid.indexing.seekablestream.SettableByteEntity
org.apache.druid.indexing.overlord.sampler.SamplerException: Failed to sample data: org.apache.druid.data.input.kafka.KafkaRecordEntity cannot be cast to org.apache.druid.indexing.seekablestream.SettableByteEntity
at org.apache.druid.indexing.overlord.sampler.InputSourceSampler.sample(InputSourceSampler.java:281) ~[classes/:?]
at org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec.sample(SeekableStreamSamplerSpec.java:116) ~[classes/:?]
at org.apache.druid.indexing.overlord.sampler.SamplerResource.post(SamplerResource.java:43) ~[classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_332]
...
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_332]
Caused by: java.lang.ClassCastException: org.apache.druid.data.input.kafka.KafkaRecordEntity cannot be cast to org.apache.druid.indexing.seekablestream.SettableByteEntity
at org.apache.druid.data.input.kafkainput.KafkaInputFormat.createReader(KafkaInputFormat.java:84) ~[?:?]
at org.apache.druid.data.input.impl.InputEntityIteratingReader.lambda$sample$1(InputEntityIteratingReader.java:97) ~[classes/:?]
at org.apache.druid.java.util.common.parsers.CloseableIterator$2.findNextIteratorIfNecessary(CloseableIterator.java:84) ~[classes/:?]
at org.apache.druid.java.util.common.parsers.CloseableIterator$2.<init>(CloseableIterator.java:69) ~[classes/:?]
at org.apache.druid.java.util.common.parsers.CloseableIterator.flatMap(CloseableIterator.java:67) ~[classes/:?]
at org.apache.druid.data.input.impl.InputEntityIteratingReader.createIterator(InputEntityIteratingReader.java:108) ~[classes/:?]
at org.apache.druid.data.input.impl.InputEntityIteratingReader.sample(InputEntityIteratingReader.java:94) ~[classes/:?]
at org.apache.druid.data.input.impl.TimedShutoffInputSourceReader.sample(TimedShutoffInputSourceReader.java:62) ~[classes/:?]
at org.apache.druid.segment.transform.TransformingInputSourceReader.sample(TransformingInputSourceReader.java:50) ~[classes/:?]
at org.apache.druid.indexing.overlord.sampler.InputSourceSampler.sample(InputSourceSampler.java:129) ~[classes/:?]
which is also what powers the web-console data loader. This is because it is making some assumptions about a SettableByteEntityReader being involved which it can use to do its thing.
This can be resolved by modifying KafkaInputFormat.createReader to something like
if (source instanceof SettableByteEntity) {
settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
} else {
settableByteEntitySource = new SettableByteEntity<>();
settableByteEntitySource.setEntity((KafkaRecordEntity) source);
}
to wrap the entity in a SettableByteEntity needed by the KafkaInputReader which allows the sampler (and data loader ui) to function correctly:

Affected Version
0.23.0+
Description
The
KafkaInputFormatadded in #11630 allows extracting additional data from Kafka messages to ingest into Druid. However, it has a problem with the Druid sampler API, exploding with:which is also what powers the web-console data loader. This is because it is making some assumptions about a
SettableByteEntityReaderbeing involved which it can use to do its thing.This can be resolved by modifying
KafkaInputFormat.createReaderto something liketo wrap the entity in a
SettableByteEntityneeded by theKafkaInputReaderwhich allows the sampler (and data loader ui) to function correctly: