Skip to content

[prism] Support Custom WindowFns #31921

@lostluck

Description

@lostluck

This requires several improvements, first, non-mmerging support, and then merging support.

  1. First, is simply supporting and allowing Custom WindowFns at all. Specifically, "non-merging" windows. This largely means having something comparable for grouping by keys in Prism.

Goal:
This should allow the Python Validates runner test "test_custom_window_type" to pass.

There are likely some Java side tests that require this as well.

There shouldn't be many changes needed for this part, since it's mostly to allow for an arbitrary byte equality windows, as identical windows should encode the same.

Likely we need to handle that here among other places.

// TODO: Custom Window handling.

And implement a reasonable comparable type for typex.Window for within prism use.

Custom WindowFns have the following Coder:

https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1025

Which is largely a timestamp followed by an arbitrary encoded bytes. We'll need to length prefix that coder sometimes as directed by the runner if the coder isn't standard.


  1. Second allowing for Custom Merging of windows.

Windowing strategies that need this (like sessions, generally, but custom fns in specific) set this field.

https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L1130

Testing here will be from the Python Validates runner side, and likely various Java benchmarks and tests.

The trick here is that we need to create and send a new SideCar Stage specific for handling the merge information before processing a GBK.

The stage will container the merge windows transform defined here.

https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L313

Whose urn is here in prism:

TransformMergeWindows = ptUrn(pipepb.StandardPTransforms_MERGE_WINDOWS)

Basically, once we've determined the GBK is firing, we first group by all the windows we currently have (in particular, windows that may not be ready to fire yet as well), and then send those to a custom stage that has DataSource, and the MergeWindows transform, and DataSink.

From there we aggregate the data for new merged windows from their constituent unmerged windows for a given key.

Basically we need to produce a bundle for this custom stage whenever we might need one for a given key, and only aggregate for that key when the watermark threshold has passed the given key.

The ability to have a SideCar stages associated with a given for meta processing is very useful, and will pay dividends for the Drain implementation, and for side input mapping.

https://beam.apache.org/documentation/programming-guide/#session-windows shows that merging windows is per key.

Given the complexity of this, we may split this into a 2nd issue though.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions