Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] kafka-client stuck when reading data #283

@zymap

Description

@zymap

Pulsar version: 2.7.0
KoP version: 2.7.0

Describe the bug
kafka-client stuck when reading data

To Reproduce
Steps to reproduce the behavior:

  1. Enable KoP in Pulsar standalone
  2. Send messages to wait for the ledger changed.
  3. Start a consumer to consume from beginning
  4. See the following error

Additional context
Error log:

19:42:28.667 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR io.streamnative.pulsar.handlers.kop.MessageFetchContext - Request RequestHeader(apiKey=FETCH, apiVersion=8, clientId=consumer-1, correlationId=31): Failed readEntry.get for topic: kop-offload-test-0.
java.util.concurrent.ExecutionException: org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
    at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$9(MessageFetchContext.java:205) ~[?:?]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_201]
    at java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$readMessagesInternal$13(MessageFetchContext.java:201) ~[?:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext$2.readEntriesFailed(MessageFetchContext.java:430) ~[?:?]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.readEntriesFailed(OpReadEntry.java:122) ~[org.apache.pulsar-managed-ledger-2.7.0.jar:2.7.0]
    at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$3(EntryCacheImpl.java:308) ~[org.apache.pulsar-managed-ledger-2.7.0.jar:2.7.0]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_201]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_201]
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_201]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: org.apache.bookkeeper.mledger.ManagedLedgerException: Unknown exception
19:42:29.189 [pulsar-io-36-18] INFO  org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl - [public/default/persistent/kop-offload-test-partition-0] Created non-durable cursor read-position=1:20 mark-delete-position=1:19
19:42:29.189 [pulsar-io-36-18] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/kop-offload-test-partition-0] Opened new cursor: NonDurableCursorImpl{ledger=public/default/persistent/kop-offload-test-partition-0, ackPos=1:19, readPos=1:20}
19:42:31.248 [bookkeeper-ml-workers-OrderedExecutor-3-0] ERROR io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils - Meet exception: {}
java.lang.IllegalArgumentException: Maximum offset delta exceeded, base offset: 17592186126336, last offset: 35184372088832
    at org.apache.kafka.common.record.MemoryRecordsBuilder.recordWritten(MemoryRecordsBuilder.java:659) ~[?:?]
    at org.apache.kafka.common.record.MemoryRecordsBuilder.appendDefaultRecord(MemoryRecordsBuilder.java:630) ~[?:?]
    at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:416) ~[?:?]
    at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.lambda$null$2(MessageRecordUtils.java:394) ~[?:?]
    at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205) ~[?:1.8.0_201]
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) ~[?:1.8.0_201]
    at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:495) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.doCompute(ForEachOps.java:483) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.compute(ForEachOps.java:400) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:158) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_201]
    at java.util.stream.IntPipeline.forEachOrdered(IntPipeline.java:409) ~[?:1.8.0_201]
    at java.util.stream.IntPipeline$Head.forEachOrdered(IntPipeline.java:570) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.lambda$entriesToRecords$3(MessageRecordUtils.java:381) ~[?:?]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_201]
    at java.util.stream.Nodes$ArrayNode.forEach(Nodes.java:684) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:490) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:505) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.doCompute(ForEachOps.java:483) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOrderedTask.compute(ForEachOps.java:400) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:158) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline.forEachOrdered(ReferencePipeline.java:423) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:593) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.entriesToRecords(MessageRecordUtils.java:352) ~[?:?]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$11(MessageFetchContext.java:327) ~[?:?]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_201]
    at java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_201]
    at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_201]
    at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_201]
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_201]
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_201]
    at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$readMessagesInternal$13(MessageFetchContext.java:282) ~[?:?]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_201]
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_201]
    at io.streamnative.pulsar.handlers.kop.MessageFetchContext$2.readEntriesComplete(MessageFetchContext.java:423) ~[?:?]
    at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:152) ~[org.apache.pulsar-managed-ledger-2.7.0.jar:2.7.0]
    at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.7.0.jar:2.7.0]
    at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.12.0.jar:4.12.0]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions