Support csv input format in Kafka ingestion with header#16630
Support csv input format in Kafka ingestion with header#16630kfaraz merged 6 commits intoapache:masterfrom
Conversation
AmatyaAvadhanula
left a comment
There was a problem hiding this comment.
Thanks for the fix, @kfaraz. LGTM!
| // Return type for the value parser should be of type MapBasedInputRow | ||
| // Parsers returning other types are not compatible currently. | ||
| valueRow = (MapBasedInputRow) r; | ||
| if (r instanceof ListBasedInputRow) { |
There was a problem hiding this comment.
I think the performance implication of this if should be okay.
Anyway can we add a UT for this ?
cryptoe
left a comment
There was a problem hiding this comment.
Lets add a UT for the above change.
clintropolis
left a comment
There was a problem hiding this comment.
what if we just made buildBlendedEventMap less picky about stuff? I attached a patch that instead makes buildBlendedEventMap look something like this
private static Map<String, Object> buildBlendedEventMap(
Function<String, Object> getRowValue,
Set<String> rowDimensions,
Map<String, Object> fallback
)
so then usage is like:
return valueParser.read().map(
r -> {
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList);
...
I didn't test much, but KafkaInputFormatTest passed minus the parse exception test due to different message from different key ordering, though that's probably easy to fix.
|
Thanks for the patch, @clintropolis ! I have tested out the changes (with a minor tweak for sampling) and updated the PR accordingly. It works as expected. |
clintropolis
left a comment
There was a problem hiding this comment.
might be nice to add a test that uses csv to KafkaInputFormatTest.
Also I think that the keyFormat might have a similar problem,
which can be changed to something like this
InputRow keyRow = keyIterator.next();
// Add the key to the mergeList only if the key string is not already present
mergedHeaderMap.putIfAbsent(
keyColumnName,
keyRow.getRaw(Iterables.getOnlyElement(keyRow.getDimensions()))
);
if we also change KafkaInputFormat.java key parser thingy to not use the regular input schema,
to something like
(keyFormat == null) ?
null :
record ->
(record.getRecord().key() == null) ?
null :
JsonInputFormat.withLineSplittable(keyFormat, false).createReader(
new InputRowSchema(
dummyTimestampSpec,
DimensionsSpec.EMPTY,
null
),
new ByteEntity(record.getRecord().key()),
temporaryDirectory
),
|
@clintropolis , I have added a test for CSV value. Do you think it would be okay if we fix the handling of the key format in a follow up PR? |
clintropolis
left a comment
There was a problem hiding this comment.
changes overall lgtm, is fine to do other fix as a follow-up
|
Thanks for the reviews, @AmatyaAvadhanula , @clintropolis ! |
|
I believe this is worth mentioning in the release notes 👍 |
Follow-up to apache#16630, which fixed a similar issue for the valueFormat.
* KafkaInputFormat: Fix handling of CSV/TSV keyFormat. Follow-up to #16630, which fixed a similar issue for the valueFormat. * Simplify.
Description
When Kafka ingestion is setup with
ioConfig.type = kafka(i.e. enable "Parse Kafka metadata") and csv input format, we get the following parsing error while both sampling the data and running an actual ingestion task.This error eventually fails the sampling with
and ingestion simply rejects the events due to the parse exception.
The root cause is that
KafkaInputReaderexpects the input rows to beMapBasedInputRowsso that it may use the event map to blend the values with headers and keys.
Changes
ListBasedInputRowtoMapBasedInputRowusing.asMap()while building blended rows that contain values from Kafka headers, key and value.
Screenshot after the fix
Testing
KafkaInputFormatTestwith csv record payloadRelease note
Allow use of csv input format in Kafka record when "Parse Kafka metadata" is also enabled.
This PR has: