Skip to content

[fix][misc] Make ConcurrentBitSet thread safe#22361

Merged
lhotari merged 4 commits intoapache:masterfrom
lhotari:lh-improve-concurrentbitset-thread-safety
Mar 27, 2024
Merged

[fix][misc] Make ConcurrentBitSet thread safe#22361
lhotari merged 4 commits intoapache:masterfrom
lhotari:lh-improve-concurrentbitset-thread-safety

Conversation

@lhotari
Copy link
Copy Markdown
Member

@lhotari lhotari commented Mar 26, 2024

Fixes #22360

Motivation

ConcurrentBitSet isn't thread safe although it claims this in the javadoc. This is causing concurrency issues in Pulsar. See #22360 for examples.

Modifications

  • Fix usage of StampedLock in ConcurrentBitSet
    • modifications must use the write lock
    • read operations must use the read lock

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added type/bug The PR fixed a bug or issue reported a bug ready-to-test labels Mar 26, 2024
@lhotari lhotari added this to the 3.3.0 milestone Mar 26, 2024
@lhotari lhotari self-assigned this Mar 26, 2024
@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Mar 26, 2024
@lhotari lhotari requested a review from codelipenghui March 26, 2024 10:46
@lhotari
Copy link
Copy Markdown
Member Author

lhotari commented Mar 26, 2024

StampedLock isn't a reentrant lock. There are dead locks with stack traces such as

"pulsar-io-8-4" #109 prio=5 os_prio=31 cpu=59.46ms elapsed=14.23s tid=0x0000000158d1a600 nid=0x12307 waiting on condition  [0x000000058bd60000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@17.0.10/Native Method)
	- parking to wait for  <0x000020001d61b2a0> (a java.util.concurrent.locks.StampedLock)
	at java.util.concurrent.locks.LockSupport.park(java.base@17.0.10/LockSupport.java:211)
	at java.util.concurrent.locks.StampedLock.acquireRead(java.base@17.0.10/StampedLock.java:1378)
	at java.util.concurrent.locks.StampedLock.readLock(java.base@17.0.10/StampedLock.java:554)
	at org.apache.pulsar.common.util.collections.ConcurrentBitSet.length(ConcurrentBitSet.java:299)
	at java.util.BitSet.previousSetBit(java.base@17.0.10/BitSet.java:799)
	at org.apache.pulsar.common.util.collections.ConcurrentBitSet.previousSetBit(ConcurrentBitSet.java:141)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.lambda$forEachRawRange$4(ConcurrentOpenLongPairRangeSet.java:219)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet$$Lambda$1521/0x0000007000b627d0.accept(Unknown Source)
	at java.util.concurrent.ConcurrentSkipListMap.forEach(java.base@17.0.10/ConcurrentSkipListMap.java:3030)
	at org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet.forEachRawRange(ConcurrentOpenLongPairRangeSet.java:211)
	at org.apache.bookkeeper.mledger.impl.RangeSetWrapper.forEachRawRange(RangeSetWrapper.java:126)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.buildIndividualDeletedMessageRanges(ManagedCursorImpl.java:3022)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.persistPositionToLedger(ManagedCursorImpl.java:3088)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalMarkDelete(ManagedCursorImpl.java:2185)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.internalAsyncMarkDelete(ManagedCursorImpl.java:2066)
	- locked <0x000020002de7a600> (a java.util.ArrayDeque)
	at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncDelete(ManagedCursorImpl.java:2392)
	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.acknowledgeMessage(PersistentSubscription.java:391)
	at org.apache.pulsar.broker.service.Consumer.individualAckNormal(Consumer.java:535)
	at org.apache.pulsar.broker.service.Consumer.messageAcked(Consumer.java:485)
	at org.apache.pulsar.broker.service.ServerCnx.handleAck(ServerCnx.java:1898)
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:160)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(java.base@17.0.10/Thread.java:840)

@lhotari
Copy link
Copy Markdown
Member Author

lhotari commented Mar 26, 2024

For resolving the deadlock, I removed the locks from the length() method and documented this.
There aren't currently external calls to length() so I think that this is a reasonable solution.
Replacing StampedLock with ReentrantReadWriteLock would have been the other option but since this is code that is part of Pulsar's performance hot spots, it could cause performance regressions. That's why I choose to keep StampedLock and just leave the length() method without an explicit lock. It will get called in other methods currently and that is fine as long as length isn't called externally.

@lhotari lhotari merged commit edd0076 into apache:master Mar 27, 2024
lhotari added a commit that referenced this pull request Mar 27, 2024
lhotari added a commit that referenced this pull request Mar 27, 2024
Technoboy- pushed a commit to Technoboy-/pulsar that referenced this pull request Apr 1, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
(cherry picked from commit edd0076)
(cherry picked from commit 31c9f44)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
(cherry picked from commit edd0076)
(cherry picked from commit 31c9f44)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
(cherry picked from commit edd0076)
(cherry picked from commit 31c9f44)
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
(cherry picked from commit edd0076)
(cherry picked from commit 31c9f44)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
(cherry picked from commit edd0076)
(cherry picked from commit 31c9f44)
hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] ConcurrentBitSet is not thread safe

4 participants