Skip to content

NPE when KafkaSupervisor getLagPerPartition() #5009

@elloooooo

Description

@elloooooo

2017-10-25T14:46:56,201 ERROR [KafkaSupervisor-m2_sfc_beatles_order_push_public_fmt] io.druid.indexing.kafka.supervisor.KafkaSupervisor - KafkaSupervisor[m2_sfc_beatles_order_push_public_fmt] failed to handle notice: {class=io.druid.indexing.kafka.supervisor.KafkaSuperv
isor, exceptionType=class java.lang.NullPointerException, exceptionMessage=null, noticeClass=RunNotice}
java.lang.NullPointerException
at java.util.HashMap.merge(HashMap.java:1216) ~[?:1.8.0_77]
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_77]
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_77]
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1683) ~[?:1.8.0_77]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_77]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_77]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_77]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_77]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) ~[?:1.8.0_77]
at io.druid.indexing.kafka.supervisor.KafkaSupervisor.getLagPerPartition(KafkaSupervisor.java:1746) ~[druid-kafka-indexing-service-release-0.10.1-101.jar:release-0.10.1-101]
at io.druid.indexing.kafka.supervisor.KafkaSupervisor.generateReport(KafkaSupervisor.java:1625) ~[druid-kafka-indexing-service-release-0.10.1-101.jar:release-0.10.1-101]
at io.druid.indexing.kafka.supervisor.KafkaSupervisor.runInternal(KafkaSupervisor.java:691) ~[druid-kafka-indexing-service-release-0.10.1-101.jar:release-0.10.1-101]
at io.druid.indexing.kafka.supervisor.KafkaSupervisor$RunNotice.handle(KafkaSupervisor.java:518) ~[druid-kafka-indexing-service-release-0.10.1-101.jar:release-0.10.1-101]
at io.druid.indexing.kafka.supervisor.KafkaSupervisor$2.run(KafkaSupervisor.java:336) [druid-kafka-indexing-service-release-0.10.1-101.jar:release-0.10.1-101]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_77]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_77]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_77]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]

Collectors.toMap(
                Map.Entry::getKey,
                e -> latestOffsetsFromKafka != null
                     && latestOffsetsFromKafka.get(e.getKey()) != null
                     && e.getValue() != null
                     ? latestOffsetsFromKafka.get(e.getKey()) - e.getValue()
                     : null
            )

Collectors.toMap will throw NPE when the value is null because the HashMap.merge don't support null value. How about exchange the null with -1?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions