Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -171,21 +170,8 @@ private CloseableIterator<InputRow> buildBlendedRows(
{
return valueParser.read().map(
r -> {
final MapBasedInputRow valueRow;
try {
// Return type for the value parser should be of type MapBasedInputRow
// Parsers returning other types are not compatible currently.
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
throw new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
);
}

final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(r::getRaw, newDimensions, headerKeyList);
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
Expand Down Expand Up @@ -244,25 +230,18 @@ private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
}
List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
ParseException parseException = null;

for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
newRawRows.add(buildBlendedEventMap(raw::get, raw.keySet(), headerKeyList));
}
for (InputRow r : rowAndValues.getInputRows()) {
MapBasedInputRow valueRow = null;
try {
valueRow = (MapBasedInputRow) r;
}
catch (ClassCastException e) {
parseException = new ParseException(
null,
"Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
if (r != null) {
final HashSet<String> newDimensions = new HashSet<>(r.getDimensions());
final Map<String, Object> event = buildBlendedEventMap(
r::getRaw,
newDimensions,
headerKeyList
);
}
if (valueRow != null) {
final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
newDimensions.addAll(headerKeyList.keySet());
// Remove the dummy timestamp added in KafkaInputFormat
newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
Expand All @@ -279,7 +258,7 @@ private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
);
}
}
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, null);
}
);
}
Expand All @@ -302,22 +281,31 @@ private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
/**
* Builds a map that blends two {@link Map}, presenting the combined keyset of both maps, and preferring to read
* from the first map and falling back to the second map if the value is not present.
*
* <p>
* This strategy is used rather than just copying the values of the keyset into a new map so that any 'flattening'
* machinery (such as {@link Map} created by {@link org.apache.druid.java.util.common.parsers.ObjectFlatteners}) is
* still in place to be lazily evaluated instead of eagerly copying.
*/
private static Map<String, Object> buildBlendedEventMap(Map<String, Object> map, Map<String, Object> fallback)
private static Map<String, Object> buildBlendedEventMap(
Function<String, Object> getRowValue,
Set<String> rowDimensions,
Map<String, Object> fallback
)
{
final Set<String> keySet = new HashSet<>(fallback.keySet());
keySet.addAll(map.keySet());
keySet.addAll(rowDimensions);

return new AbstractMap<String, Object>()
{
@Override
public Object get(Object key)
{
return map.getOrDefault((String) key, fallback.get(key));
final String skey = (String) key;
final Object val = getRowValue.apply(skey);
if (val == null) {
return fallback.get(skey);
}
return val;
}

@Override
Expand Down
Loading