Skip to content

[Bug] if there are many message hole with delay message, sub will very slow and occupy the io thread #17451

@leizhiyuan

Description

@leizhiyuan

Search before asking

  • I searched in the issues and found nothing similar.

Version

master

Minimal reproduce step

just keep many message hole in memory

What did you expect to see?

do not block pulsar-io,which will make the sub failed with timeout

What did you see instead?

[4] Busy(98.0%) thread(41196/0xa0ec) stack of java process(36835) under user(root):
"pulsar-io-23-20" #347 prio=5 os_prio=0 tid=0x00007f831c027000 nid=0xa0ec runnable [0x00007f817f62b000]
   java.lang.Thread.State: RUNNABLE
        at com.google.common.collect.Range.compareOrThrow(Range.java:712)
        at com.google.common.collect.Cut.compareTo(Cut.java:77)
        at com.google.common.collect.Range.<init>(Range.java:355)
        at com.google.common.collect.Range.create(Range.java:156)
        at com.google.common.collect.Range.openClosed(Range.java:205)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.lambda$forEach$4(ConcurrentOpenLongPairRangeSet.java:212)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet$$Lambda$623/2008085945.accept(Unknown Source)
        at java.util.concurrent.ConcurrentSkipListMap.forEach(ConcurrentSkipListMap.java:3269)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.forEach(ConcurrentOpenLongPairRangeSet.java:200)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntries(ManagedCursorImpl.java:1321)
        at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.getNumberOfEntries(ManagedCursorImpl.java:866)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$checkBackloggedCursors$78(PersistentTopic.java:2148)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$646/893850913.accept(Unknown Source)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:387)
        at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkBackloggedCursors(PersistentTopic.java:2146)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$11(PersistentTopic.java:702)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$636/490623965.accept(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
        at java.util.concurrent.CompletableFuture.uniAcceptStage(CompletableFuture.java:669)
        at java.util.concurrent.CompletableFuture.thenAccept(CompletableFuture.java:1997)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:696)
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:587)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$13(ServerCnx.java:976)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$609/1007138891.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:981)
        at java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2124)
        at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$16(ServerCnx.java:942)
        at org.apache.pulsar.broker.service.ServerCnx$$Lambda$603/1291454277.apply(Unknown Source)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
        at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
        at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
        at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:891)
        at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:257)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
        at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

Anything else?

we should not execute long time work in pulsar-io

make this method

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions