Skip to content

Conversation

@ryucc
Copy link
Contributor

@ryucc ryucc commented Apr 14, 2023

Add JavaDoc to BundleManager.

  1. Add JavaDoc
  2. Rename some tests

This was a repurposed PR.

Original description: name tryStartBundle to countNewElement.
The current implementation tries to start the bundle, then adds an element to the new bundle. countNewElement is a better naming, because

  1. A new element is always added, but a bundle is not always started.
  2. The only usage of this method is in DoFnOp.processElement, it reads better to context.

We can also add a startBundle interface in the future, but there are currently no use cases.

The new name changes the subject to adding an element as a theme, and starting the bundle as a lazy side effect. It aligns more with what the method is actually doing.

One symmetry we are breaking is in DoFnOp.processElement, we had a tryStartBundle paired with a tryFinishBundle. Instead now we have countNewElement paired with tryFinishBundle. I think the old reading favors 1 element per bundle, while the new reading favors multiple elements in a bundle, and we check if the bundle is full/should be finished after each add.

Other changes:

  1. Update unit test names.
  2. Remove duplicate tests.

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

The current implementation tries to start the bundle, then adds an element to the new bundle. countNewElement is a better naming, because

1. A new element is always added, but a bundle is not always started.
2. The only usage of this method is in DoFnOp.processElement, it reads better to context.

We can also add a startBundle interface in the future, but there are currently no use cases.

The new name changes the subject to adding an element as a theme, and starting the bundle as a lazy side effect. It aligns more with what the method is actually doing.

One symmetry we are breaking is in DoFnOp.processElement, we had a `tryStartBundle` paired with a `tryFinishBundle`. Instead now we have `countNewElement` paired with `tryFinishBundle`. I think the old reading favors 1 element per bundle, while the new reading favors multiple elements in a bundle, and we check if the bundle is full/should be finished after each add.

Other changes:
1. Renamed some tests.
assertEquals(
"Expected pending bundle count to be 0", 0L, bundleManager.getPendingBundleCount());
assertFalse("Error didn't reset the bundle as expected.", bundleManager.isBundleStarted());
bundleManager.countNewElement();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removed block is already tested in testTryStartBundleThrowsExceptionFromTheListener. The only difference is the cause of tryStartBundle failure. I'm isolating this part to a new and smaller test.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@mynameborat
Copy link
Contributor

mynameborat commented Apr 17, 2023

Overall, I feel the PR doesn't address/improve much rather introduces more confusion. Here are the reasons

A new element is always added, but a bundle is not always started.

The name tryStartBundle already reflects this although in retrospection, I feel this isn't something we should necessarily cascade through names rather through API behavior. e.g., it should be start with semantics calling out as idempotent behavior within a bundle while lifecycle start for every new bundle.

Additionally, the symmetry of tryFinishBundle to countNewElement doesn't read well. What does it mean for the caller to invoke tryFinishBundle when there is nothing that conveys the caller to start one.

The only usage of this method is in DoFnOp.processElement, it reads better to context.

Not sure I agree with this as well. DoFnOp.processElement is a bridge between samza operator and beam ParDo and directly interacts with PushbackSideInputDoFnRunner. The sequence of PushbackSideInputDoFnRunner is startBundle, many processElementInReadyWindows and finishBundle which expects the callers to keep track of bundle boundaries and invoke these methods appropriately.

So it very much reads with the context of tryStartBundle --> startBundle, processElementInReadyWindows and tryFinishBundle --> finishBundle.

All said, I think there is scope for refactor and improvements as I see. e.g.,

The existing bundle manager does two things

  1. Handle bundle management (track bundles, invokes start and finish lifecycle method of the underlying runner)
  2. Process messages, handle watermark and process timers.

It is the way it is because we don't wait until we create a complete bundle and delegate it to the underlying runner as and when we receive elements. Due to this behavior, 1 & 2 exists together. It is possible to separate them out and have the BundleProcessor which handles 2) and is capable of handling early delegation (handle uncommitted bundles) vs lazy delegation (end of a fully formed bundle) and potentially handle watermark and timers as well depending on which model it runs.

All our use cases (classic & portability) uses the early delegation strategy and hence not critical to do the above mentioned refactor. There might be benefits in doing above if we plan to have multiple bundles in which case the bundle management might become a bit heavy and so does 2).

@alnzng
Copy link
Contributor

alnzng commented Apr 17, 2023

+1 stick with the naming tryStartBundle because of the following:

  • Literally BundleManager is the place to manage the lifecycle of the bundle of events. It makes sense that it has a step/function to run StartBundle which maps to the DoFn's StartBundle[1].
  • The current codes inside tryStartBundle does prepare for starting the bundle processing. For example, bundleProgressListener.onBundleStarted() actually invoke the underlying DoFn's PushbackSideInputDoFnRunner.StartBundle method.
  • Other Beam runners (e.g. Flink) does use a similar naming to callout StartBundle in their logical "BundleManager"[2].
    -- Flink doesn't abstract its "BundleManger" into a separate class as how Samza does today, but the intention of related codes is similar.

[1] https://beam.apache.org/releases/javadoc/2.2.0/org/apache/beam/sdk/transforms/ParDo.html
[2] https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L674

@ryucc ryucc changed the title Remane tryStartBundle to countNewElement. Add JavaDoc to BundleManager Apr 17, 2023
@ryucc
Copy link
Contributor Author

ryucc commented Apr 18, 2023

@mynameborat

The name tryStartBundle already reflects this although in retrospection,

I don't see how tryStartBundle reflects adding an element to me.

I would read tryStartBundle as if it hasn't started, start the bundle; else do nothing.

Also having a bundle start with 1 element only makes half-sense to me. I like starting with 0, because when I count, I start counting from 0. It gives me security.

@ryucc
Copy link
Contributor Author

ryucc commented Apr 18, 2023

The existing bundle manager does two things

imo two things is too many things. "Process messages, handle watermark and process timers." also makes it 4 things instead of 2 things. My ideal BundleManager should only count the elements and look at the time window to decide if the bundle is closed or not. The code that handles the watermark can ask BundleManager the state of the bundle, but it should not be BundleManager.

Not sure how much we agree on this, but I'm not changing this part at the moment.

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the enhancements!

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@xinyuiscool
Copy link
Contributor

The test failures are unrelated (downloading dependencies in mvn repo). Merge the pr since java precommit and other checks passed.

@xinyuiscool xinyuiscool merged commit 75f8d73 into apache:master Apr 20, 2023
@ryucc ryucc deleted the samza-bundle-manager-refac branch April 20, 2023 22:39
ryucc added a commit to linkedin/beam that referenced this pull request May 31, 2023
Extract BundleManager to an Interface in SamzaRunner (apache#26268)

Refactor DoFnOp.FutureCollectorImpl to a top level class in SamzaRunner (apache#26274)

Add JavaDoc to BundleManager in Samza Runner (apache#26287)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants