You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Describe the bug
When I consume messages from a pulsar topic created&consumed a several days ago, encountered an error:
15:44:30.196 [bookkeeper-ml-workers-OrderedExecutor-0-0] ERROR io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils - Meet exception: {}
java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Maximum offset delta exceeded, base offset: 2533274790395904, last offset: 7793338417676288
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_251]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_251]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_251]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:593) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:158) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_251]
at java.util.stream.ReferencePipeline.forEachOrdered(ReferencePipeline.java:423) ~[?:1.8.0_251]
at java.util.stream.ReferencePipeline$Head.forEachOrdered(ReferencePipeline.java:593) ~[?:1.8.0_251]
at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.entriesToRecords(MessageRecordUtils.java:351) ~[?:?]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$null$11(MessageFetchContext.java:320) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_251]
at java.util.concurrent.ConcurrentHashMap$EntrySpliterator.forEachRemaining(ConcurrentHashMap.java:3606) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_251]
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_251]
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_251]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext.lambda$readMessagesInternal$13(MessageFetchContext.java:278) ~[?:?]
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) ~[?:1.8.0_251]
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) ~[?:1.8.0_251]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) ~[?:1.8.0_251]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) ~[?:1.8.0_251]
at io.streamnative.pulsar.handlers.kop.MessageFetchContext$2.readEntriesComplete(MessageFetchContext.java:416) ~[?:?]
at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:152) ~[org.apache.pulsar-managed-ledger-2.6.2.jar:2.6.2]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [org.apache.pulsar-managed-ledger-2.6.2.jar:2.6.2]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at org.apache.bookkeeper.common.util.OrderedExecutor$TimedRunnable.run(OrderedExecutor.java:203) [org.apache.bookkeeper-bookkeeper-common-4.10.0.jar:4.10.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_251]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_251]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.48.Final.jar:4.1.48.Final]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_251]
Caused by: java.lang.IllegalArgumentException: Maximum offset delta exceeded, base offset: 2533274790395904, last offset: 7793338417676288
at org.apache.kafka.common.record.MemoryRecordsBuilder.recordWritten(MemoryRecordsBuilder.java:659) ~[?:?]
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendDefaultRecord(MemoryRecordsBuilder.java:630) ~[?:?]
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:416) ~[?:?]
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449) ~[?:?]
at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.lambda$null$2(MessageRecordUtils.java:393) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfInt.accept(ForEachOps.java:205) ~[?:1.8.0_251]
at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110) ~[?:1.8.0_251]
at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:495) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.doCompute(ForEachOps.java:483) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.compute(ForEachOps.java:400) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:158) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOp$OfInt.evaluateParallel(ForEachOps.java:189) ~[?:1.8.0_251]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_251]
at java.util.stream.IntPipeline.forEachOrdered(IntPipeline.java:410) ~[?:1.8.0_251]
at java.util.stream.IntPipeline$Head.forEachOrdered(IntPipeline.java:572) ~[?:1.8.0_251]
at io.streamnative.pulsar.handlers.kop.utils.MessageRecordUtils.lambda$entriesToRecords$3(MessageRecordUtils.java:380) ~[?:?]
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_251]
at java.util.stream.Nodes$ArrayNode.forEach(Nodes.java:684) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:490) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:505) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:505) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.onCompletion(ForEachOps.java:505) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.tryComplete(CountedCompleter.java:577) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.doCompute(ForEachOps.java:483) ~[?:1.8.0_251]
at java.util.stream.ForEachOps$ForEachOrderedTask.compute(ForEachOps.java:400) ~[?:1.8.0_251]
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_251]
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_251]
Currently KoP coverts pulsar MessageId to Kafka offset by:
20 bits edgerId + 32 bits entryId + 12 bits batchIndex.
privatevoidrecordWritten(longoffset, longtimestamp, intsize) {
if (numRecords == Integer.MAX_VALUE)
thrownewIllegalArgumentException("Maximum number of records per batch exceeded, max records: " + Integer.MAX_VALUE);
if (offset - baseOffset > Integer.MAX_VALUE)
thrownewIllegalArgumentException("Maximum offset delta exceeded, base offset: " + baseOffset +
", last offset: " + offset);
//...
}
It restricts the offset delta less than Integer.MAX_VALUE.
Describe the bug
When I consume messages from a pulsar topic created&consumed a several days ago, encountered an error:
Currently KoP coverts pulsar MessageId to Kafka offset by:
20 bits edgerId + 32 bits entryId + 12 bits batchIndex.
While in
org/apache/kafka/common/record/MemoryRecordsBuilder:https://github.com/apache/kafka/blob/fda67018375ce3f6b90658e1ae9c30ab463e0240/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java#L729
It restricts the offset delta less than Integer.MAX_VALUE.
Seems if ledgerId changed, will cause this error.