Skip to content

Conversation

@sjvanrossum
Copy link
Contributor

@sjvanrossum sjvanrossum commented Mar 4, 2025

The offset gap ratio may artificially shrink the backlog if consumers can't catch up to the tail of an expiring topic. This may cause runners to trigger a downscaling event which worsens the issue.

MovingAvg has been modified to atomically write the accumulated state since concurrent normal loads/stores of longs/doubles may tear. The numUpdates field is only used by the writer and can be kept as non-volatile, but the update method ensures that normal loads/stores on numUpdates are ordered in relation to acquiring loads and releasing stores on avg. To prevent false sharing I've padded the class since there may be tens to hundreds of instances of the accumulator and updates happen per consumed record.

The JMH benchmark I've added shows a slight uplift in average time per op for both reads and writes compared to the current implementation.

Results of task :sdks:java:io:kafka:jmh:jmh on a t2d-standard-60 Cloud Workstation:

Benchmark                                 Mode  Cnt       Score       Error  Units
KafkaIOUtilsBenchmark.Atomic              avgt   15   50693.751 ±  4937.770  ns/op
KafkaIOUtilsBenchmark.Atomic:atomicRead   avgt   15    5577.357 ±  1135.962  ns/op
KafkaIOUtilsBenchmark.Atomic:atomicWrite  avgt   15  140926.539 ± 14847.095  ns/op
KafkaIOUtilsBenchmark.Plain               avgt   15   65018.754 ±  9814.457  ns/op
KafkaIOUtilsBenchmark.Plain:plainRead     avgt   15    6658.736 ±   288.912  ns/op
KafkaIOUtilsBenchmark.Plain:plainWrite    avgt   15  181738.789 ± 29403.883  ns/op

Note that this test likely does not highlight the effect of padding since it doesn't construct a large pool of accumulators.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 4, 2025

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@sjvanrossum
Copy link
Contributor Author

Run Spotless PreCommit

@github-actions
Copy link
Contributor

github-actions bot commented Mar 5, 2025

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java.
R: @Abacn for label build.
R: @Abacn for label io.
R: @johnjcasey for label kafka.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Member

@kennknowles kennknowles left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy for the improvement. I may be misunderstanding what is strictly necessary to make your changes work as intended but TL;DR the inheritance all seems extraneous - one of them seems like inlining is equivalent and clearer, while the other seems like it is more clearly expressed as a field.

@sjvanrossum
Copy link
Contributor Author

sjvanrossum commented Mar 11, 2025

@kennknowles I've updated the experiment setup in the benchmark. I've reduced concurrent readers to 1, because as far as I know only the ProcessElement thread reads and writes the accumulator and a GetSize thread occassionally reads the accumulator concurrently in the SDF implementation and it's either similar or entirely single-threaded for the unbounded source implementation. I've also split these tests out into concurrents reads and writes and isolated reads and writes, because those concurrent reads only happen every so often and occasionally overlap with writes (observed in #32921 rarely triggering ConcurrentModificationException). With multiple concurrent readers I had observed 10-20% improvement with layout padding, but the improvements are barely noticeable now if at all (on my workstation).

Benchmark                                                                               Mode  Cnt   Score   Error  Units
KafkaIOUtilsBenchmark.ReadAndWriteAtomic                                                avgt   15  12.948 ± 1.999  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteAtomic:atomicReadWhileWriting                         avgt   15   4.798 ± 0.200  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteAtomic:atomicWriteWhileReading                        avgt   15  21.098 ± 3.811  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePaddedAtomic                                          avgt   15  13.676 ± 1.509  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePaddedAtomic:paddedAtomicReadWhileWriting             avgt   15   4.598 ± 0.064  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePaddedAtomic:paddedAtomicWriteWhileReading            avgt   15  22.755 ± 3.075  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePlain                                                 avgt   15  15.909 ± 1.780  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePlain:plainReadWhileWriting                           avgt   15   4.058 ± 0.160  ns/op
KafkaIOUtilsBenchmark.ReadAndWritePlain:plainWriteWhileReading                          avgt   15  27.760 ± 3.589  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteSynchronizedPlain                                     avgt   15  95.190 ± 1.845  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteSynchronizedPlain:synchronizedPlainReadWhileWriting   avgt   15  98.039 ± 3.973  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteSynchronizedPlain:synchronizedPlainWriteWhileReading  avgt   15  92.341 ± 2.358  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteVolatile                                              avgt   15  26.432 ± 4.415  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteVolatile:volatileReadWhileWriting                     avgt   15   3.879 ± 0.056  ns/op
KafkaIOUtilsBenchmark.ReadAndWriteVolatile:volatileWriteWhileReading                    avgt   15  48.984 ± 8.849  ns/op
KafkaIOUtilsBenchmark.ReadAtomic                                                        avgt   15   2.185 ± 0.007  ns/op
KafkaIOUtilsBenchmark.ReadPaddedAtomic                                                  avgt   15   2.193 ± 0.010  ns/op
KafkaIOUtilsBenchmark.ReadPlain                                                         avgt   15   2.206 ± 0.011  ns/op
KafkaIOUtilsBenchmark.ReadSynchronizedPlain                                             avgt   15   7.975 ± 0.571  ns/op
KafkaIOUtilsBenchmark.ReadVolatile                                                      avgt   15   2.199 ± 0.011  ns/op
KafkaIOUtilsBenchmark.WriteAtomic                                                       avgt   15   8.183 ± 0.003  ns/op
KafkaIOUtilsBenchmark.WritePaddedAtomic                                                 avgt   15   8.183 ± 0.003  ns/op
KafkaIOUtilsBenchmark.WritePlain                                                        avgt   15   9.592 ± 0.746  ns/op
KafkaIOUtilsBenchmark.WriteSynchronizedPlain                                            avgt   15  11.145 ± 1.714  ns/op
KafkaIOUtilsBenchmark.WriteVolatile                                                     avgt   15  11.967 ± 0.003  ns/op

I agree that it's not worth the hassle to maintain unless there's a significant upside so I've removed the layout padding from MovingAvg based on the results above.

@sjvanrossum sjvanrossum force-pushed the kafkaio-sdf-backlog-estimation branch from 0ba68c2 to e90f8fc Compare March 14, 2025 12:54
@sjvanrossum sjvanrossum requested a review from kennknowles March 14, 2025 15:51
continue;
}

long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I admit I don't know this well enough to know why we tracked this or why it can be removed, even with your description in the PR. I trust your experiments, and I don't see this being a data integrity issue, though. I'd love to be educated at some point.

@kennknowles kennknowles merged commit e71fdce into apache:master Mar 18, 2025
23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants