From ad3b8cb06dc63b1f2a3f59d9ab3b880792cc3c71 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Fri, 9 Aug 2024 16:27:58 -0700 Subject: [PATCH 01/10] initial draft prism docs --- sdks/go/cmd/prism/WALKTHROUGH.md | 100 ++++++++++ .../pkg/beam/runners/prism/internal/README.md | 2 +- .../runners/prism/internal/engine/README.md | 184 ++++++++++++++++++ 3 files changed, 285 insertions(+), 1 deletion(-) create mode 100644 sdks/go/cmd/prism/WALKTHROUGH.md create mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/README.md diff --git a/sdks/go/cmd/prism/WALKTHROUGH.md b/sdks/go/cmd/prism/WALKTHROUGH.md new file mode 100644 index 000000000000..0f93140896e4 --- /dev/null +++ b/sdks/go/cmd/prism/WALKTHROUGH.md @@ -0,0 +1,100 @@ + + +# Prism Deep Dive Walkthrough + +The Apache Beam Go SDK Prism Runner is a portable local Beam runner, that aims to be +the best way to test and validate your pipelines outside of production. + +In short: It's too hard to make sure your pipeline will work the way you expect. + +Runners have different semantics, and implement different feature sets, and +have different behaviors. They're all bigger than expected. Some require +tuning to run properly locally. Some requires payment. + +We largely operate as though a user has synced to the repo, which shouldn't be the case for all users. + +# Runner Capabilities + +Dataflow has two different execution modes, which have their own execution characteristics. +Dataflow requires a cloud connection, and costs money. + +Dataflow doesn't support ProcessingTime in Batch. +Flink doesn't support ProcessingTime, and requires specific Java configuration. +Java Direct runner only works for Java, and requires specific Java configuration. +Python Direct/Portable runner doesn't handle streaming, and requires specific Python configuration. +Go Direct Runner has never received much development, because the Go SDK was built +against Dataflow and it's google internal counterpart, Flume. + +People references the Direct runner, but don't clarify which one, because language communities +tend to be insular by nature. + +Direct runners often cheat when executing their matching language SDKs. + +This is despite Beam providing cross language abilities by it's portable nature. + +The space is large and complicated, which we try to sort out with these large tables, showing +what capabilities SDKs have, versus what the runners are able to execute. + +When an error occurs, in many runners, it's not entirely clear what the remeadiation strategy is. +We've built Beam runners on top of Flink and Spark, but is the error from my pipeline, +from the Beam interface layers, or from the underlying engine? + +Dataflow has various logs, but one can sometimes need to be a Dataflow engineer to dig deeper into +an issue there. + +## Runners as a piece of SDK development + +Beam provides a model and set of interfaces for how to build and execute a data pipeline. + +We provide a guide for how to author a runner, which is generally tuned to how to wrap an existing runner, +and points to wrappers authored in Java. It doesn't really teach how to write a runner. + +We don't provide an SDK authoring Guide at all, and one either needs to already be a expert in the Beam model +or authored an existing SDK to be able to bootstrap the process. + +One also needs to be already comfortable with starting up a runner on the side to + execute against, if you manage to get far enough to submit jobs as portable beam pipelines. + +## History + +Development on Prism started in late 2022 as lostluck wanted an easier way to validate +Beam pipelines. In particular for the Go SDK. + +It was too hard to use a runner that was not built into the SDK. + +Each of the other available runners were not appropriate for this purpose, or required +heavier weight set up. + +It's much easier these days to set up Python and Java, but tooling is inconsistent for +those not immersed in it. + + +## Why in Go? + +Largely because Prism came out of an effort to make the Go SDK easier to test and adopt, it was a natural fit. + +But Go has advantages over Java and Python when it comes to portable use. + +Go compiles to small, static binaries. +Go can compile binaries cross platform trivially. +It's garbage collected, which meshes well with how Beam operates. + +While the Beam Java and Python SDK Worker containers need hundreds to thousands of megabytes of code and jars. + diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index d19f2ca5fa60..776a73cd65de 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -48,7 +48,7 @@ except for dependency on `engine.TentativeData` which will likely be removed at `internal` AKA the package in this directory root. Contains fhe job execution flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating -with the `engine` and `worker` packages, and handlers urn. +with the `engine` and `worker` packages, and handlers. Most configurable behavior is determined here. `web` contains a web server to visualize aspects of the runner, and should never be diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/README.md b/sdks/go/pkg/beam/runners/prism/internal/engine/README.md new file mode 100644 index 000000000000..b9c777cef41b --- /dev/null +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/README.md @@ -0,0 +1,184 @@ +# Prism ElementManager Overview + +This README documents in broad strokes how Prism executes a pipeline. + +## Job Execution + +```mermaid +sequenceDiagram + autonumber + participant X as Job Executor + Note over X: Pre-process job's pipeline graph into stages + + create participant S as Stages + X->>S: create + create participant W as SDK Worker + X->>W: create + create participant EM as ElementManager + X->>EM: Configure Stages in ElementManager + activate EM + X->>+EM: Request Bundles + loop For each bundle from the ElementManager + EM->>X: Return a Bundle for a Stage + deactivate EM + X->>S: Execute Bundle + activate S + S->>+W: ProcessBundleRequest + S-->W: Stream Data and State for Bundle + W->>-S: ProcessBundleResponse + alt Bundle success + S->>EM: Persist Bundle Results + else Bundle failure + S->>EM: Report Bundle Failure + end + S->>X: Return Bundle status + deactivate S + alt Bundle success + Note right of X: Continue Processing. + else Bundle failure + Note right of X: Fail job + end + end + destroy S + destroy EM + Note over X: Terminate Job +``` + +When a job is submitted to Prism, and has been accepted by Job Management, +it is then passed to the Job Executor function. The Job's pipeline graph is +then preprocessed to handle performance improvements like Graph Fusion, and +Combiner Lifting, but also to do other substitutions such as breaking down +SplittableDoFns into their executable components. Preprocessing returns +a set of stateless executable stages. + +Stages in hand, the job executor produces SDK workers for each environment, +and configures an ElementManager with the stage information, so it can +begin to produce and manage bundles. Those bundles are then handed to +appropriate workers for processing. + +The ElementManager will produce as many bundles as are ready for execution, +WRT necessary restrictions on processing. For example, stateful stages may +require that only a single inprogress bundle may operate on a given user key +at a time, while aggregations like GroupByKey will only execute when their +windowing strategy dictates, and DoFns with side inputs can only execute when +all approprate side inputs are ready. + +Architecturally, the ElementManager is only aware of the properties of the +fused stages, and not their actual relationships with the Beam Protocol Buffers. +The ElementManager is not aware of individual transforms. + +Interfacing with SDK workers is left to the stateless job executore stages as +needed for bundle processing. + +## How are bundles produced? + +Work is divided into Bundles for execution, typically, on end user DoFns within +SDK workers. + +Producing bundles is the ElementManager's job. The ElementManager is the heart +of Prism. The element manager tracks the state for each stage, which includes a +stage's relationships with others, the pending input elements, the various watermarks, +whether it is stateful or aggregating. + +Each executing pipeline has it's own instance of the ElementManager which +manages all element data for the pipeline. +The core loop of the engine is to produce bundles for stages to process. +A bundle represents some number of elements as well as a stage to process them on. +Each stage is associated with an environment that can execute them, and +can assign work over the FnAPI to an SDK worker. + +Prism does this by tracking the event time watermark for each +stage, as well as the set of pending elements and state per stage. + +Pending Elements are elements that have been received as input by the stage +but have not yet been processed. + + +```mermaid +graph TD; + Start>Start Job] + subgraph X [Job Executor] + Build[" + Optimizes graph into stages. + Configures them on the ElementManager. + Bootstrap stages with Impulses. + "] + ExecuteBundles["Start Execution"] + Process[" + Process Bundle + "] + BundleResponse{" + BundleResponse + "} + Success(" + Terminate Job as Successful + ") + Failed(" + Terminate Job as Failed + ") + end + subgraph EM [Element Manager] + Bundles[" + Bundles() + "] + Refresh[" + Refresh watermarks for stages. + "] + CheckReady[" + For each stage: + see if it has pending elements + elegible to process with + the current watermark. + "] + Emit[" + Output Bundle for Processing + "] + Quiescense{" + Quiescense check: + Can the pipeline + make progress? + "} + Persist[" + Persist Bundle Results + "] + end + + Start-->Build + Build==>ExecuteBundles + ExecuteBundles== Request Bundles ==>Bundles + ExecuteBundles== Receive Bundle ==> Process + ExecuteBundles== No More Bundles ==>Success + Bundles== Start Refresh Loop ==>Refresh + Refresh== stages with advanced watermarks ==>CheckReady + CheckReady== "Has ready work" ==>Emit + Emit ==> Quiescense + Quiescense== yes ==>Refresh + Quiescense-- no -->Failed + Emit== Bundle to process==>ExecuteBundles + BundleResponse == "Success" ==> Persist + BundleResponse -- "Failure" --> Failed + Process ==> BundleResponse + Persist == Add stage to watermark refreshes ==> Refresh + + Start ~~~ Bundles + Build ~~~ Bundles +``` + + + + +## Glossary + +* Element: A single value of data to be processed, or a timer to trigger. +* Stage: A fused grouping of one or more transforms with a single parallel input PCollection, + zero or more side input PCollecitons, and zero or more output PCollections. + The engine is unaware of individual user transforms, and relies on the calling + job executor to configure how stages are related. +* Bundle: An arbitrary non-empty set of elements, to be executed by a stage. +* Watermark: An event time which relates to the the readiness to process data in the engine. + Each stage has several watermarks it tracks: Input, Output, and Upstream. +* Quiescense: Wether the pipeline is or is able to perform work. + * The pipeline will try to advance all watermarks to infinity, and attempt to + process all pending elements. + * A pipeline will successfully terminate when there are no pending elements to process, + and no outstanding in progress bundles. From 2aa53363554b630f484155427f2fde00543320fc Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Mon, 12 Aug 2024 16:20:06 -0700 Subject: [PATCH 02/10] Consolidate to a single deep dive document in the internal package. --- .../pkg/beam/runners/prism/internal/README.md | 328 +++++++++++++++++- .../runners/prism/internal/engine/README.md | 184 ---------- 2 files changed, 320 insertions(+), 192 deletions(-) delete mode 100644 sdks/go/pkg/beam/runners/prism/internal/engine/README.md diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 776a73cd65de..a027e0ee2984 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -17,6 +17,8 @@ under the License. --> +The following refers to the current state of Prism, and is subject to change. + # Prism internal packages Go has a mechanism for ["internal" packages](https://go.dev/doc/go1.4#internalpackages) @@ -26,7 +28,7 @@ This mechanism is used thoroughly for Prism to ensure we can make changes to the runner's internals without worrying about the exposed surface changes breaking non-compliant users. -# Structure +## Structure Here's a loose description of the current structure of the runner. Leaf packages should not depend on other parts of the runner. Runner packages can and do depend on other @@ -37,8 +39,9 @@ Handler configurations are registered by dependant packages. `urns` contains beam URN strings pulled from the protos. Leaf package. -`engine` contains the core manager for handling elements, watermarks, and windowing strategies. -Determines bundle readiness, and stages to execute. Leaf package. +`engine` contains the core manager for handling elements, watermarks, windowing strategies, +timers, and state. Determines bundle readiness, and stages to execute. Leaf package. +Should never depend on the protocol buffers. `jobservices` contains GRPC service handlers for job management and submission. Should only depend on the `config` and `urns` packages. @@ -46,9 +49,10 @@ Should only depend on the `config` and `urns` packages. `worker` contains interactions with FnAPI services to communicate with worker SDKs. Leaf package except for dependency on `engine.TentativeData` which will likely be removed at some point. -`internal` AKA the package in this directory root. Contains fhe job execution -flow. Jobs are sent to it from `jobservices`, and those jobs are then executed by coordinating -with the `engine` and `worker` packages, and handlers. +`internal` AKA the package in this directory root. Contains the job execution +flow. Jobs are sent to it from `jobservices`, through the Job Executor function, +and those jobs are then executed by coordinating with the `engine` and `worker` packages. +Handles the pipeline protos directly. Most configurable behavior is determined here. `web` contains a web server to visualize aspects of the runner, and should never be @@ -56,12 +60,320 @@ depended on by the other packages for the runner. It will primarily abstract access to pipeline data through a JobManagement API client. If no other option exists, it may use direct access to exported data from the other packages. -# Testing +## Testing The sub packages should have reasonable Unit Test coverage in their own directories, but -most features will be exercised via executing pipelines in this package. +most features will be exercised via executing pipelines in the internal package, and engine +package. To avoid overly re-implementing tests, use of the "_test" formalism to validate +coverage through re-using tests from sdks/go/tests is recommended. For the time being test DoFns should be added to standard build in order to validate execution coverage, in particular for Combine and Splittable DoFns. Eventually these behaviors should be covered by using Prism in the main SDK tests. + +# Deep Dive + +This section covers how Prism operates just above the code level, and addresses the +questions of "how it works". + +## Job Execution + +```mermaid +sequenceDiagram + autonumber + participant X as Job Executor + Note over X: Pre-process pipeline
graph into stages + + create participant S as Stages + X->>S: create + create participant W as SDK Worker + X->>W: create + create participant EM as ElementManager + X->>EM: Configure Stages in ElementManager + activate EM + X->>+EM: Request Bundles + loop For each bundle from the ElementManager + EM->>X: Return a Bundle for a Stage + deactivate EM + X->>S: Execute Bundle + activate S + S->>+W: ProcessBundleRequest + S-->W: Stream Data and State for Bundle + W->>-S: ProcessBundleResponse + alt Bundle success + S->>EM: Persist Bundle Results + else Bundle failure + S->>EM: Report Bundle Failure + end + S->>X: Return Bundle status + deactivate S + alt Bundle success + Note right of X: Continue Processing. + else Bundle failure + Note right of X: Fail job + end + end + destroy S + destroy EM + Note over X: Terminate Job +``` + +When a job is submitted to Prism, and has been accepted by Job Management, +it is then passed to the Job Executor function. The Job's pipeline graph is +then preprocessed to handle performance improvements like Graph Fusion, and +Combiner Lifting, but also to do other substitutions such as breaking down +SplittableDoFns into their executable components. Preprocessing returns +a set of stateless executable stages. + +Stages in hand, the job executor produces SDK workers for each environment, +and configures an ElementManager with the stage information, so it can +begin to produce and manage bundles. Those bundles are then handed to +appropriate workers for processing. + +The ElementManager will produce as many bundles as are ready for execution, +WRT necessary restrictions on processing. For example, stateful stages may +require that only a single inprogress bundle may operate on a given user key +at a time, while aggregations like GroupByKey will only execute when their +windowing strategy dictates, and DoFns with side inputs can only execute when +all approprate side inputs are ready. + +Architecturally, the ElementManager is only aware of the properties of the +fused stages, and not their actual relationships with the Beam Protocol Buffers. +The ElementManager is not aware of individual transforms. + +Interfacing with SDK workers is left to the stateless job executore stages as +needed for bundle processing. + +### How are bundles produced? + +Work is divided into Bundles for execution, typically, on end user DoFns within +SDK workers. + +Producing bundles is the ElementManager's job. The ElementManager is the heart +of Prism. The element manager tracks the state for each stage, which includes a +stage's relationships with others, the pending input elements, the various watermarks, +whether it is stateful or aggregating. + +Each executing pipeline has it's own instance of the ElementManager which +manages all element data for the pipeline. +The core loop of the engine is to produce bundles for stages to process. +A bundle represents some number of elements as well as a stage to process them on. +Each stage is associated with an environment that can execute them, and +can assign work over the FnAPI to an SDK worker. + +Prism does this by tracking the event time watermark for each +stage, as well as the set of pending elements and state per stage. + +Pending Elements are elements that have been received as input by the stage +but have not yet been processed. + + +```mermaid +graph TD; + Start>Start Job] + subgraph X [Job Executor] + Build[" + Optimizes graph into stages. + Configures them on the ElementManager. + Bootstrap stages with Impulses. + "] + ExecuteBundles["Start Execution"] + Process[" + Process Bundle + "] + BundleResponse{" + BundleResponse + "} + Success(" + Terminate Job as Successful + ") + Failed(" + Terminate Job as Failed + ") + end + subgraph EM [Element Manager] + Bundles[" + Bundles() + "] + Refresh[" + Refresh watermarks for stages. + "] + CheckReady[" + For each stage: + see if it has pending elements + elegible to process with + the current watermark. + "] + Emit[" + Output Bundle for Processing + "] + Quiescense{" + Quiescense check: + Can the pipeline + make progress? + "} + Persist[" + Persist Bundle Results + "] + end + + Start-->Build + Build==>ExecuteBundles + ExecuteBundles== Request Bundles ==>Bundles + ExecuteBundles== Receive Bundle ==> Process + ExecuteBundles== No More Bundles ==>Success + Bundles== Start Refresh Loop ==>Refresh + Refresh== stages with advanced watermarks ==>CheckReady + CheckReady== "Has ready work" ==>Emit + Emit ==> Quiescense + Quiescense== yes ==>Refresh + Quiescense-- no -->Failed + Emit== Bundle to process==>ExecuteBundles + BundleResponse == "Success" ==> Persist + BundleResponse -- "Failure" --> Failed + Process ==> BundleResponse + Persist == Add stage to watermark refreshes ==> Refresh + + Start ~~~ Bundles + Build ~~~ Bundles +``` + +## Threading and Concurrency Model + +Prism leverages Go's lightweight threading primitive, called goroutines, to handle +concurrent processing of bundles. Go handles scheduling execution of goroutines onto +operating system threads for execution, and it's reasonably effective at this. +Goroutines allow prism to distribute multiple bundles to SDK workers simultaneously. +There are some benefits to re-use Goroutines, but not critically so. +Generally the goal is to strategically limit parallelism to avoid +wasting time in the goroutine scheduler. + +For a single Job Goroutines are initialized to the following cardinatlities +Per Job, Per Environment, and Per Bundle. + +This section will attempt to outline the threads associated with Prism when +executing a job. This will not include SDK side threads, such as those +within containers, or started by an external worker service. + +As a rule of thumb, each Bundle is processed on +an independant goroutine, with a few exceptions. + +jobservices.Server implements a beam JobManagmenent GRPC services. GRPC servers +have goroutines managed by GRPC itself. Call this G goroutines. + +When RunJob is called, the server starts a goroutine with the Job Executor function. + +The Job Executor function will start goroutines for each worker environment in order +to manage workers. One for manageing the environment itself, such as a docker container, +and one for the FnAPI server for that environment, to communicate with the SDK Worker +in that environment. These will persist for the lifetime of the job. + +After producing stages for the job the Job Executor goroutine will wait on Bundles +from the ElementManager, and execute them in parallel. +Each bundle has it's own goroutine. +This is by default configured to a maximum of 8 simultaneous Bundles. + +The bundle goroutine will communicate with the worker associated with the bundle's stage. +This handles sending the bundle for execution on the SDK worker, managing progress and +split requests, and persisting data and other results after the bundle completes. + +The FnAPI server side worker will have several goroutines to handle requests and responses +from the SDK worker. FnAPI services are typically bi directional streaming RPCs. +There will be one goroutine handling requests being sent to the SDK, and another for responses +from the SDK. +So there will be a pair of goroutines for each of the Control, Data, and State streams, and a +single one for Logging. + +The ElementManager returns a channel to the Job Executor in order to relay those bundles, and +starts two goroutines in order to populate that channel with bundles. +One goroutine is tasked with evaluating and refreshing the watermark whenever a stage persists +a bundle. This goroutine sends bundles on the channel when appropriate. This goroutine waits on +a condition variable for changes from processed bundles. +The other goroutine blocks until there are no more pending elements, at which point it +cancels the watermark evaluating goroutine, and unblocks the condition variable, in that order. + +Prism's main thread when run as a stand alone command will block forever after +initializing the beam JobManagment services. Similarly, a built in prism instance +in the Go SDK will start a Job Management instance, and the main thread is blocked +while the job is executing through the "Universal" runner handlers for a pipeline. + +A job thus has the following number of persistent goroutines in execution: + +For a Prism instance: +* G for the job services. + +For each Job: +* 1 for the Job Executor +* 2 for the ElementManager + +For each Environment: +* 3*2 + 1 for the FnAPI Control, Data, State, and Logging Streams +* G for the worker's GRPC server. + +For each Bundle: +* 1 to handle bundle execution, up to the configured maxium parallel bundles (default 8) + +Letting E be the number of environments in the job, and B maximum number of parallel bundles: + +Total Goroutines = G + (1 + 2) + E*(3*2 +1 + G) + B(1) + +Total Goroutines = G + 3 + 7E + E*G + B + +Total Goroutines = G(E + 1) + 7E + B + 3 + +So for a job J with 1 eviroment, and the default maximum parallel bundles, 8: + +Total Goroutines for Job J = G((1) + 1) + 7(1) + (8) + 3 + +Total Goroutines for Job J = 2G + 7 + 8 + 3 + +Total Goroutines for Job J = 2G + 18 + +2 GRPC servers + 18 goroutines may sound like a lot, but most processes in a Prism instance +are waiting for some trigger to execute, or some timers to go off. FnAPI messages are multiplexed +so the expectation is the data service goroutines will be the busiest moving data back and forth +from the SDK. + +A consequence of this approach is the need to take care in locking shared resources and data +when they may be accessed by multiple goroutines. In particular, is all done in the ElementManager +which has locks for each stage in order to serialze access to it's state. This state is notably +accessed by the Bundle goroutines on persisting data back to the Element manager. + +For best performance, we do as much work as possible in the Bundle Goroutines since they are +what can most dynamically scale out, as bundle generation is handled by the single watermark +evaluation thread. This may include migrating handling of data responses away from the Data +received goroutine and into the Bundle processing goroutine, so bundles are less likely to +block each other. + +## Durability Model + +Prism keeps all data in memory, and doesn't write anything durably to disk. +Recommended only for testing purposes. It's not suitable for any long term production +work due to the risk of dataloss or recomputing. + +SDK side code however may be make durable changes or side effects in external systems. +Prism cannot restrict such behavior. + +Prism doesn't retry failed bundle, and simply fails the job instead. + +## Execution Distribution Model + +Prism is intended to be single machine, local runner. If it does start up docker containers +for SDK environments, they'll be on the same machine as Prism. + +# Glossary + +* Element: A single value of data to be processed, or a timer to trigger. +* Stage: A fused grouping of one or more transforms with a single parallel input PCollection, + zero or more side input PCollecitons, and zero or more output PCollections. + The engine is unaware of individual user transforms, and relies on the calling + job executor to configure how stages are related. +* Bundle: An arbitrary non-empty set of elements, to be executed by a stage. +* Watermark: An event time which relates to the the readiness to process data in the engine. + Each stage has several watermarks it tracks: Input, Output, and Upstream. +* Quiescense: Wether the pipeline is or is able to perform work. + * The pipeline will try to advance all watermarks to infinity, and attempt to + process all pending elements. + * A pipeline will successfully terminate when there are no pending elements to process, + and no outstanding in progress bundles. diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/README.md b/sdks/go/pkg/beam/runners/prism/internal/engine/README.md deleted file mode 100644 index b9c777cef41b..000000000000 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/README.md +++ /dev/null @@ -1,184 +0,0 @@ -# Prism ElementManager Overview - -This README documents in broad strokes how Prism executes a pipeline. - -## Job Execution - -```mermaid -sequenceDiagram - autonumber - participant X as Job Executor - Note over X: Pre-process job's pipeline graph into stages - - create participant S as Stages - X->>S: create - create participant W as SDK Worker - X->>W: create - create participant EM as ElementManager - X->>EM: Configure Stages in ElementManager - activate EM - X->>+EM: Request Bundles - loop For each bundle from the ElementManager - EM->>X: Return a Bundle for a Stage - deactivate EM - X->>S: Execute Bundle - activate S - S->>+W: ProcessBundleRequest - S-->W: Stream Data and State for Bundle - W->>-S: ProcessBundleResponse - alt Bundle success - S->>EM: Persist Bundle Results - else Bundle failure - S->>EM: Report Bundle Failure - end - S->>X: Return Bundle status - deactivate S - alt Bundle success - Note right of X: Continue Processing. - else Bundle failure - Note right of X: Fail job - end - end - destroy S - destroy EM - Note over X: Terminate Job -``` - -When a job is submitted to Prism, and has been accepted by Job Management, -it is then passed to the Job Executor function. The Job's pipeline graph is -then preprocessed to handle performance improvements like Graph Fusion, and -Combiner Lifting, but also to do other substitutions such as breaking down -SplittableDoFns into their executable components. Preprocessing returns -a set of stateless executable stages. - -Stages in hand, the job executor produces SDK workers for each environment, -and configures an ElementManager with the stage information, so it can -begin to produce and manage bundles. Those bundles are then handed to -appropriate workers for processing. - -The ElementManager will produce as many bundles as are ready for execution, -WRT necessary restrictions on processing. For example, stateful stages may -require that only a single inprogress bundle may operate on a given user key -at a time, while aggregations like GroupByKey will only execute when their -windowing strategy dictates, and DoFns with side inputs can only execute when -all approprate side inputs are ready. - -Architecturally, the ElementManager is only aware of the properties of the -fused stages, and not their actual relationships with the Beam Protocol Buffers. -The ElementManager is not aware of individual transforms. - -Interfacing with SDK workers is left to the stateless job executore stages as -needed for bundle processing. - -## How are bundles produced? - -Work is divided into Bundles for execution, typically, on end user DoFns within -SDK workers. - -Producing bundles is the ElementManager's job. The ElementManager is the heart -of Prism. The element manager tracks the state for each stage, which includes a -stage's relationships with others, the pending input elements, the various watermarks, -whether it is stateful or aggregating. - -Each executing pipeline has it's own instance of the ElementManager which -manages all element data for the pipeline. -The core loop of the engine is to produce bundles for stages to process. -A bundle represents some number of elements as well as a stage to process them on. -Each stage is associated with an environment that can execute them, and -can assign work over the FnAPI to an SDK worker. - -Prism does this by tracking the event time watermark for each -stage, as well as the set of pending elements and state per stage. - -Pending Elements are elements that have been received as input by the stage -but have not yet been processed. - - -```mermaid -graph TD; - Start>Start Job] - subgraph X [Job Executor] - Build[" - Optimizes graph into stages. - Configures them on the ElementManager. - Bootstrap stages with Impulses. - "] - ExecuteBundles["Start Execution"] - Process[" - Process Bundle - "] - BundleResponse{" - BundleResponse - "} - Success(" - Terminate Job as Successful - ") - Failed(" - Terminate Job as Failed - ") - end - subgraph EM [Element Manager] - Bundles[" - Bundles() - "] - Refresh[" - Refresh watermarks for stages. - "] - CheckReady[" - For each stage: - see if it has pending elements - elegible to process with - the current watermark. - "] - Emit[" - Output Bundle for Processing - "] - Quiescense{" - Quiescense check: - Can the pipeline - make progress? - "} - Persist[" - Persist Bundle Results - "] - end - - Start-->Build - Build==>ExecuteBundles - ExecuteBundles== Request Bundles ==>Bundles - ExecuteBundles== Receive Bundle ==> Process - ExecuteBundles== No More Bundles ==>Success - Bundles== Start Refresh Loop ==>Refresh - Refresh== stages with advanced watermarks ==>CheckReady - CheckReady== "Has ready work" ==>Emit - Emit ==> Quiescense - Quiescense== yes ==>Refresh - Quiescense-- no -->Failed - Emit== Bundle to process==>ExecuteBundles - BundleResponse == "Success" ==> Persist - BundleResponse -- "Failure" --> Failed - Process ==> BundleResponse - Persist == Add stage to watermark refreshes ==> Refresh - - Start ~~~ Bundles - Build ~~~ Bundles -``` - - - - -## Glossary - -* Element: A single value of data to be processed, or a timer to trigger. -* Stage: A fused grouping of one or more transforms with a single parallel input PCollection, - zero or more side input PCollecitons, and zero or more output PCollections. - The engine is unaware of individual user transforms, and relies on the calling - job executor to configure how stages are related. -* Bundle: An arbitrary non-empty set of elements, to be executed by a stage. -* Watermark: An event time which relates to the the readiness to process data in the engine. - Each stage has several watermarks it tracks: Input, Output, and Upstream. -* Quiescense: Wether the pipeline is or is able to perform work. - * The pipeline will try to advance all watermarks to infinity, and attempt to - process all pending elements. - * A pipeline will successfully terminate when there are no pending elements to process, - and no outstanding in progress bundles. From 6fff4780a6d398d66392665753b8c6900af305ee Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Tue, 13 Aug 2024 08:31:15 -0700 Subject: [PATCH 03/10] Fix goroutine count for environments, link issue. --- .../pkg/beam/runners/prism/internal/README.md | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index a027e0ee2984..3ae2e8b04c3e 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -264,10 +264,10 @@ have goroutines managed by GRPC itself. Call this G goroutines. When RunJob is called, the server starts a goroutine with the Job Executor function. -The Job Executor function will start goroutines for each worker environment in order -to manage workers. One for manageing the environment itself, such as a docker container, -and one for the FnAPI server for that environment, to communicate with the SDK Worker -in that environment. These will persist for the lifetime of the job. +The Job Executor function will start goroutines for each worker Environment in order +to manage workers. One for managing the Environment itself, such as a docker container, +and one for the FnAPI server for that Environment, to communicate with the SDK Worker +in that Environment. These will persist for the lifetime of the job. After producing stages for the job the Job Executor goroutine will wait on Bundles from the ElementManager, and execute them in parallel. @@ -308,6 +308,7 @@ For each Job: * 2 for the ElementManager For each Environment: +* 2 for the Environment itself * 3*2 + 1 for the FnAPI Control, Data, State, and Logging Streams * G for the worker's GRPC server. @@ -316,24 +317,24 @@ For each Bundle: Letting E be the number of environments in the job, and B maximum number of parallel bundles: -Total Goroutines = G + (1 + 2) + E*(3*2 +1 + G) + B(1) +Total Goroutines = G + (1 + 2) + E*(3*2 +1 + 2 + G) + B(1) -Total Goroutines = G + 3 + 7E + E*G + B +Total Goroutines = G + 3 + 9E + E*G + B -Total Goroutines = G(E + 1) + 7E + B + 3 +Total Goroutines = G(E + 1) + 9E + B + 3 So for a job J with 1 eviroment, and the default maximum parallel bundles, 8: -Total Goroutines for Job J = G((1) + 1) + 7(1) + (8) + 3 +Total Goroutines for Job J = G((1) + 1) + 9(1) + (8) + 3 -Total Goroutines for Job J = 2G + 7 + 8 + 3 +Total Goroutines for Job J = 2G + 9 + 8 + 3 -Total Goroutines for Job J = 2G + 18 +Total Goroutines for Job J = 2G + 20 -2 GRPC servers + 18 goroutines may sound like a lot, but most processes in a Prism instance -are waiting for some trigger to execute, or some timers to go off. FnAPI messages are multiplexed -so the expectation is the data service goroutines will be the busiest moving data back and forth -from the SDK. +2 GRPC servers + 20 goroutines may sound like a lot, but most processes in a Prism instance +are waiting for some trigger to execute. They are not busy waiting or spin looping. +FnAPI messages are multiplexed so the expectation is the data service goroutines will +be the busiest moving data back and forth from the SDK. A consequence of this approach is the need to take care in locking shared resources and data when they may be accessed by multiple goroutines. In particular, is all done in the ElementManager @@ -346,6 +347,11 @@ evaluation thread. This may include migrating handling of data responses away fr received goroutine and into the Bundle processing goroutine, so bundles are less likely to block each other. +As one G of goroutines is the single Job Management instance for prism itself, and the other is +for the worker endpoint, these are no wasted either. Prism can assign uniques names to workers +to be able to identify which job they're a part of, so it's possible to avoid the per job and +worker grpc service, and multiplex from there. (See https://github.com/apache/beam/issues/32167) + ## Durability Model Prism keeps all data in memory, and doesn't write anything durably to disk. From 3cae7d62f4c99a9f35003e13dfc3fad96d854e35 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Sun, 18 Aug 2024 14:02:43 -0700 Subject: [PATCH 04/10] Add description of the Bundles Channel. --- .../go/pkg/beam/runners/prism/internal/README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 3ae2e8b04c3e..499d07d4cf31 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -352,6 +352,22 @@ for the worker endpoint, these are no wasted either. Prism can assign uniques na to be able to identify which job they're a part of, so it's possible to avoid the per job and worker grpc service, and multiplex from there. (See https://github.com/apache/beam/issues/32167) +A channel is being used to move ready to execute bundles from the ElementManager to the Job Executor. +This may be unbuffered (the default) which means +serializing how bundles are generated for execution, +and there being at most a single "readyToExecute" +bundle at a time. An unbufferred channel puts a +bottleneck on the job since there may be additional +ready work to execute. On the other hand, it also +allows for bundles to be made larger as more data +may have arrived. + +The channel could be made to be buffered, to allow +multiple bundles to be prepared for execution. +This would lead to lower latency as bundles could be made smaller, and faster to execute, as it would +permit pipelineing in work generation, but may lead +to higher lock contention and variability in execution. + ## Durability Model Prism keeps all data in memory, and doesn't write anything durably to disk. From ea772362d1d6137b74b3b400a37ef85b00e20906 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Sun, 18 Aug 2024 15:49:52 -0700 Subject: [PATCH 05/10] Watermark Eval loop, and bundle preperation. --- .../pkg/beam/runners/prism/internal/README.md | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 499d07d4cf31..30dd525f2260 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -239,6 +239,96 @@ graph TD; Build ~~~ Bundles ``` +### Watermark Evaluation and Bundle Generation + +Here's a closer look at the watermark evaluation goroutine. + +A key idea is that execution can't progress unless something has changed a stage. +A stage can only change as a result of a bundle being persisted. + +Persisting a bundle leads to zero or more new elements are now pending for one or more stages (including the stage that was just persisted). + +A stage's watermarks determine what elements are +ready to be processed, and thus what can be included in a bundle. + +Each stage has several watermarks: upstream, input, and output. +The upstream watermark is determined by the output watermarks of stages that provide elements to the stage. +The input watermark is determined by the pending and +inprogress elements for the stage. +The output watermark is where we have determined we have fully processed all input elements. + +A stage's watermark progress depends on its pending and in progress elements and timers, and the output watermark of its upstream stages. +As these change this enables more data to be processed, +and allow downstream stages to be processed. + +Elements are associated with an event time timestamp. +Pending elements for a stage can be added to a bundle for processing, when the stage's upstream watermark has advanced passed the element's timestamp. + +So Bundles are produced for a stage based on its watermark progress, with elements + +A stage's watermarks are determined by it's upstream stages, it's current watermark state, and it's current +pending elements. + +At the start of a job, all stages are initialized to have watermarks at the the minimum time, and impulse elements are added to their consuming stages. + + +```mermaid +sequenceDiagram +loop Until Job Terminates + box Purple Element Manager + participant WE as Watermark Evaluation + participant RCL as RefreshConditionLock + end + box Green ProcessBundle + participant SS as Stage State + end + + WE->>RCL: Lock() + activate RCL + Note over WE: Get stages that
have changed. + + loop While there are no changed stages + WE->>RCL: Unlock() + deactivate RCL + + + Note over RCL: Wait in progress Bundles
to be persisted. + WE->>RCL: Wait() + activate RCL + Note over WE: Get stages that
have changed. + end + + WE->>SS: Refresh watermarks for changed stages. + activate SS + + Note over SS: Compute new watermarks. + SS->>WE: Return stages with advanced watermarks. + deactivate SS + + loop for each advanced stage + WE-->SS: Is stage ready to produce a bundle? + alt Produce Bundle + WE-->SS: Produce a bundle + activate SS + Note over SS: Find elements that
are ready to process + SS-->WE: Return bundle + deactivate SS + WE->>RCL: Unlock() + deactivate RCL + Note right of WE: Output bundle on channel. + WE->>RCL: Lock() + activate RCL + else + Note right of WE: continue with next stage + end + end + Note right of WE: Check if job can make progress. + WE->>RCL: Unlock() + deactivate RCL +end + +``` + ## Threading and Concurrency Model Prism leverages Go's lightweight threading primitive, called goroutines, to handle @@ -392,8 +482,14 @@ for SDK environments, they'll be on the same machine as Prism. The engine is unaware of individual user transforms, and relies on the calling job executor to configure how stages are related. * Bundle: An arbitrary non-empty set of elements, to be executed by a stage. +* Upstream Stages: Stages that provide input to the +current stage. Not all stages have upstream stages. +* Downstream Stages: Stages that depend on input from the current stage. Not alls tages have downstream stages. * Watermark: An event time which relates to the the readiness to process data in the engine. Each stage has several watermarks it tracks: Input, Output, and Upstream. + * Upstream Watermark: The minimum output watermark of all stages that provide input to this stage. + * Input watermark: The minumum event time of all elements pending or in progress for this stage. + * Output watermark: The maxiumum of the current output watermark, the estimated output watermark (if available), and the minimum of watermark holds. * Quiescense: Wether the pipeline is or is able to perform work. * The pipeline will try to advance all watermarks to infinity, and attempt to process all pending elements. From 161f62582276ff631c11a53fe8d3f329050d2eba Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:22:52 -0700 Subject: [PATCH 06/10] rm walkthrough doc --- sdks/go/cmd/prism/WALKTHROUGH.md | 100 ------------------------------- 1 file changed, 100 deletions(-) delete mode 100644 sdks/go/cmd/prism/WALKTHROUGH.md diff --git a/sdks/go/cmd/prism/WALKTHROUGH.md b/sdks/go/cmd/prism/WALKTHROUGH.md deleted file mode 100644 index 0f93140896e4..000000000000 --- a/sdks/go/cmd/prism/WALKTHROUGH.md +++ /dev/null @@ -1,100 +0,0 @@ - - -# Prism Deep Dive Walkthrough - -The Apache Beam Go SDK Prism Runner is a portable local Beam runner, that aims to be -the best way to test and validate your pipelines outside of production. - -In short: It's too hard to make sure your pipeline will work the way you expect. - -Runners have different semantics, and implement different feature sets, and -have different behaviors. They're all bigger than expected. Some require -tuning to run properly locally. Some requires payment. - -We largely operate as though a user has synced to the repo, which shouldn't be the case for all users. - -# Runner Capabilities - -Dataflow has two different execution modes, which have their own execution characteristics. -Dataflow requires a cloud connection, and costs money. - -Dataflow doesn't support ProcessingTime in Batch. -Flink doesn't support ProcessingTime, and requires specific Java configuration. -Java Direct runner only works for Java, and requires specific Java configuration. -Python Direct/Portable runner doesn't handle streaming, and requires specific Python configuration. -Go Direct Runner has never received much development, because the Go SDK was built -against Dataflow and it's google internal counterpart, Flume. - -People references the Direct runner, but don't clarify which one, because language communities -tend to be insular by nature. - -Direct runners often cheat when executing their matching language SDKs. - -This is despite Beam providing cross language abilities by it's portable nature. - -The space is large and complicated, which we try to sort out with these large tables, showing -what capabilities SDKs have, versus what the runners are able to execute. - -When an error occurs, in many runners, it's not entirely clear what the remeadiation strategy is. -We've built Beam runners on top of Flink and Spark, but is the error from my pipeline, -from the Beam interface layers, or from the underlying engine? - -Dataflow has various logs, but one can sometimes need to be a Dataflow engineer to dig deeper into -an issue there. - -## Runners as a piece of SDK development - -Beam provides a model and set of interfaces for how to build and execute a data pipeline. - -We provide a guide for how to author a runner, which is generally tuned to how to wrap an existing runner, -and points to wrappers authored in Java. It doesn't really teach how to write a runner. - -We don't provide an SDK authoring Guide at all, and one either needs to already be a expert in the Beam model -or authored an existing SDK to be able to bootstrap the process. - -One also needs to be already comfortable with starting up a runner on the side to - execute against, if you manage to get far enough to submit jobs as portable beam pipelines. - -## History - -Development on Prism started in late 2022 as lostluck wanted an easier way to validate -Beam pipelines. In particular for the Go SDK. - -It was too hard to use a runner that was not built into the SDK. - -Each of the other available runners were not appropriate for this purpose, or required -heavier weight set up. - -It's much easier these days to set up Python and Java, but tooling is inconsistent for -those not immersed in it. - - -## Why in Go? - -Largely because Prism came out of an effort to make the Go SDK easier to test and adopt, it was a natural fit. - -But Go has advantages over Java and Python when it comes to portable use. - -Go compiles to small, static binaries. -Go can compile binaries cross platform trivially. -It's garbage collected, which meshes well with how Beam operates. - -While the Beam Java and Python SDK Worker containers need hundreds to thousands of megabytes of code and jars. - From 0db110266cb6824fcd1d2ab367c947cb3508f3f2 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:51:17 -0700 Subject: [PATCH 07/10] Add section on bundle splitting. --- .../pkg/beam/runners/prism/internal/README.md | 82 +++++++++++++++---- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 30dd525f2260..9e9e650cdd50 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -74,7 +74,7 @@ Eventually these behaviors should be covered by using Prism in the main SDK test # Deep Dive -This section covers how Prism operates just above the code level, and addresses the +These sections cover how Prism operates just above the code level, and addresses the questions of "how it works". ## Job Execution @@ -89,7 +89,7 @@ sequenceDiagram X->>S: create create participant W as SDK Worker X->>W: create - create participant EM as ElementManager + create participant EM as ElementManager X->>EM: Configure Stages in ElementManager activate EM X->>+EM: Request Bundles @@ -147,12 +147,12 @@ needed for bundle processing. ### How are bundles produced? -Work is divided into Bundles for execution, typically, on end user DoFns within +Work is divided into Bundles for execution on end user DoFns within SDK workers. Producing bundles is the ElementManager's job. The ElementManager is the heart of Prism. The element manager tracks the state for each stage, which includes a -stage's relationships with others, the pending input elements, the various watermarks, +stage's relationships with others, the pending input elements, the various watermarks, whether it is stateful or aggregating. Each executing pipeline has it's own instance of the ElementManager which @@ -241,7 +241,7 @@ graph TD; ### Watermark Evaluation and Bundle Generation -Here's a closer look at the watermark evaluation goroutine. +Here's a closer look at the watermark evaluation goroutine. A key idea is that execution can't progress unless something has changed a stage. A stage can only change as a result of a bundle being persisted. @@ -255,7 +255,7 @@ Each stage has several watermarks: upstream, input, and output. The upstream watermark is determined by the output watermarks of stages that provide elements to the stage. The input watermark is determined by the pending and inprogress elements for the stage. -The output watermark is where we have determined we have fully processed all input elements. +The output watermark is where we have determined we have fully processed all input elements. A stage's watermark progress depends on its pending and in progress elements and timers, and the output watermark of its upstream stages. As these change this enables more data to be processed, @@ -264,7 +264,7 @@ and allow downstream stages to be processed. Elements are associated with an event time timestamp. Pending elements for a stage can be added to a bundle for processing, when the stage's upstream watermark has advanced passed the element's timestamp. -So Bundles are produced for a stage based on its watermark progress, with elements +So Bundles are produced for a stage based on its watermark progress, with elements A stage's watermarks are determined by it's upstream stages, it's current watermark state, and it's current pending elements. @@ -273,7 +273,7 @@ At the start of a job, all stages are initialized to have watermarks at the the ```mermaid -sequenceDiagram +sequenceDiagram loop Until Job Terminates box Purple Element Manager participant WE as Watermark Evaluation @@ -321,14 +321,60 @@ loop Until Job Terminates else Note right of WE: continue with next stage end - end + end Note right of WE: Check if job can make progress. WE->>RCL: Unlock() deactivate RCL end - + ``` +## Bundle Spliting + +In order to efficiently process data and scale, Beam Runners can use a combination +of two broad approaches to dividing work, Initial Splitting, and Dynamic Splitting. + +### Initial Splitting + +Initial Splitting is a part of bundle generation, and is decided before bundles +even begin processing. This should take into account the current state +of the pipeline, oustanding data to be processed, and the current load on the system. +Larger bundles require fewer "round trips" to the SDK and batch processing, but in +general, are processed serially by the runner, leading to higher latency for downstream +results. Smaller bundles may incur per bundle overhead more frequently, but can yeild lower +latency execution. + +Runners must strike a balance, to avoid over splitting, where per bundle overhead dominates, or +undersplitting, where available parallelism on the workers is going unused. + +Prism doesn't yet provide sophisticated initial splits outside of it's own testing harness. + +Bundle generation is greedy, as described above, without particular limits or parallelism goals. + +### Dynamic Splitting + +Dynamic Splitting is how a Beam runner can spread the processing load after a bundle starts execution. +It also has many different names, such as work stealing, or liquid-sharding. +The goal is to be able to complete work faster, by completing it in parallel. + +Prism's approach to dynamic splitting is naive, but does take into account previous bundles for +the stage. + +In short, if a bundle is making dicernable progress, then do not split. Otherwise, do split. + +In particular, it takes into account whether new inputs have been consumed, or +elements have been emitted to downstream PCollections between two BundleProgress requests. +If there hasn't, then a split request is made for half of the unprocessed work from the SDK. + +The progress interval for the stage (not simply this bundle) is then increased, to reduce +the frequency of splits if the stage is relatively slow at processing. +The stage's progress interval is decreased if bundles complete so quickly that no progress requests +can be made durinng the interval. + +Oversplitting can still occur for this approach, so https://github.com/apache/beam/issues/32538 +proposes incorporating the available execution parallelism into the decision of whether or not to +split a bundle. + ## Threading and Concurrency Model Prism leverages Go's lightweight threading primitive, called goroutines, to handle @@ -337,7 +383,7 @@ operating system threads for execution, and it's reasonably effective at this. Goroutines allow prism to distribute multiple bundles to SDK workers simultaneously. There are some benefits to re-use Goroutines, but not critically so. Generally the goal is to strategically limit parallelism to avoid -wasting time in the goroutine scheduler. +wasting time in the goroutine scheduler. For a single Job Goroutines are initialized to the following cardinatlities Per Job, Per Environment, and Per Bundle. @@ -361,7 +407,7 @@ in that Environment. These will persist for the lifetime of the job. After producing stages for the job the Job Executor goroutine will wait on Bundles from the ElementManager, and execute them in parallel. -Each bundle has it's own goroutine. +Each bundle has it's own goroutine. This is by default configured to a maximum of 8 simultaneous Bundles. The bundle goroutine will communicate with the worker associated with the bundle's stage. @@ -390,11 +436,11 @@ while the job is executing through the "Universal" runner handlers for a pipelin A job thus has the following number of persistent goroutines in execution: -For a Prism instance: +For a Prism instance: * G for the job services. For each Job: -* 1 for the Job Executor +* 1 for the Job Executor * 2 for the ElementManager For each Environment: @@ -434,7 +480,7 @@ accessed by the Bundle goroutines on persisting data back to the Element manager For best performance, we do as much work as possible in the Bundle Goroutines since they are what can most dynamically scale out, as bundle generation is handled by the single watermark evaluation thread. This may include migrating handling of data responses away from the Data -received goroutine and into the Bundle processing goroutine, so bundles are less likely to +received goroutine and into the Bundle processing goroutine, so bundles are less likely to block each other. As one G of goroutines is the single Job Management instance for prism itself, and the other is @@ -450,10 +496,10 @@ bundle at a time. An unbufferred channel puts a bottleneck on the job since there may be additional ready work to execute. On the other hand, it also allows for bundles to be made larger as more data -may have arrived. +may have arrived. The channel could be made to be buffered, to allow -multiple bundles to be prepared for execution. +multiple bundles to be prepared for execution. This would lead to lower latency as bundles could be made smaller, and faster to execute, as it would permit pipelineing in work generation, but may lead to higher lock contention and variability in execution. @@ -485,7 +531,7 @@ for SDK environments, they'll be on the same machine as Prism. * Upstream Stages: Stages that provide input to the current stage. Not all stages have upstream stages. * Downstream Stages: Stages that depend on input from the current stage. Not alls tages have downstream stages. -* Watermark: An event time which relates to the the readiness to process data in the engine. +* Watermark: An event time which relates to the the readiness to process data in the engine. Each stage has several watermarks it tracks: Input, Output, and Upstream. * Upstream Watermark: The minimum output watermark of all stages that provide input to this stage. * Input watermark: The minumum event time of all elements pending or in progress for this stage. From cd4151b3c18dc5fdbadb5dec2f07898c79995e49 Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 26 Sep 2024 09:51:27 -0700 Subject: [PATCH 08/10] ws lint --- sdks/go/pkg/beam/runners/prism/internal/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 9e9e650cdd50..307d5d5efab4 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -342,7 +342,7 @@ of the pipeline, oustanding data to be processed, and the current load on the sy Larger bundles require fewer "round trips" to the SDK and batch processing, but in general, are processed serially by the runner, leading to higher latency for downstream results. Smaller bundles may incur per bundle overhead more frequently, but can yeild lower -latency execution. +latency execution. Runners must strike a balance, to avoid over splitting, where per bundle overhead dominates, or undersplitting, where available parallelism on the workers is going unused. From 55429ccde5d523fc56be419dc1f0dcfddae53eac Mon Sep 17 00:00:00 2001 From: lostluck <13907733+lostluck@users.noreply.github.com> Date: Thu, 26 Sep 2024 13:34:33 -0700 Subject: [PATCH 09/10] Jack's first comment pass. --- .../pkg/beam/runners/prism/internal/README.md | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 307d5d5efab4..9fb2b63280da 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -40,7 +40,7 @@ Handler configurations are registered by dependant packages. `urns` contains beam URN strings pulled from the protos. Leaf package. `engine` contains the core manager for handling elements, watermarks, windowing strategies, -timers, and state. Determines bundle readiness, and stages to execute. Leaf package. +state, and timers. Determines bundle readiness, and stages to execute. Leaf package. Should never depend on the protocol buffers. `jobservices` contains GRPC service handlers for job management and submission. @@ -127,22 +127,22 @@ SplittableDoFns into their executable components. Preprocessing returns a set of stateless executable stages. Stages in hand, the job executor produces SDK workers for each environment, -and configures an ElementManager with the stage information, so it can +and configures an `ElementManager` with the stage information, so it can begin to produce and manage bundles. Those bundles are then handed to appropriate workers for processing. -The ElementManager will produce as many bundles as are ready for execution, +The `ElementManager` will produce as many bundles as are ready for execution, WRT necessary restrictions on processing. For example, stateful stages may require that only a single inprogress bundle may operate on a given user key at a time, while aggregations like GroupByKey will only execute when their windowing strategy dictates, and DoFns with side inputs can only execute when all approprate side inputs are ready. -Architecturally, the ElementManager is only aware of the properties of the +Architecturally, the `ElementManager` is only aware of the properties of the fused stages, and not their actual relationships with the Beam Protocol Buffers. -The ElementManager is not aware of individual transforms. +The `ElementManager` is not aware of individual transforms. -Interfacing with SDK workers is left to the stateless job executore stages as +Interfacing with SDK workers is left to the stateless job executor stages as needed for bundle processing. ### How are bundles produced? @@ -150,12 +150,12 @@ needed for bundle processing. Work is divided into Bundles for execution on end user DoFns within SDK workers. -Producing bundles is the ElementManager's job. The ElementManager is the heart -of Prism. The element manager tracks the state for each stage, which includes a +Producing bundles is the `ElementManager`'s job. The `ElementManager` is the heart +of Prism. The `ElementManager` tracks the state for each stage, which includes a stage's relationships with others, the pending input elements, the various watermarks, -whether it is stateful or aggregating. +and whether it is stateful or aggregating. -Each executing pipeline has it's own instance of the ElementManager which +Each executing pipeline has it's own instance of the `ElementManager` which manages all element data for the pipeline. The core loop of the engine is to produce bundles for stages to process. A bundle represents some number of elements as well as a stage to process them on. @@ -192,7 +192,7 @@ graph TD; Terminate Job as Failed ") end - subgraph EM [Element Manager] + subgraph EM [ElementManager] Bundles[" Bundles() "] @@ -275,7 +275,7 @@ At the start of a job, all stages are initialized to have watermarks at the the ```mermaid sequenceDiagram loop Until Job Terminates - box Purple Element Manager + box Purple ElementManager participant WE as Watermark Evaluation participant RCL as RefreshConditionLock end @@ -355,12 +355,12 @@ Bundle generation is greedy, as described above, without particular limits or pa Dynamic Splitting is how a Beam runner can spread the processing load after a bundle starts execution. It also has many different names, such as work stealing, or liquid-sharding. -The goal is to be able to complete work faster, by completing it in parallel. +The goal is to be able to complete work faster by completing it in parallel. Prism's approach to dynamic splitting is naive, but does take into account previous bundles for the stage. -In short, if a bundle is making dicernable progress, then do not split. Otherwise, do split. +In short, if a bundle is making discernable progress, then do not split. Otherwise, do split. In particular, it takes into account whether new inputs have been consumed, or elements have been emitted to downstream PCollections between two BundleProgress requests. @@ -385,17 +385,17 @@ There are some benefits to re-use Goroutines, but not critically so. Generally the goal is to strategically limit parallelism to avoid wasting time in the goroutine scheduler. -For a single Job Goroutines are initialized to the following cardinatlities +For a single Job Goroutines are initialized to the following cardinalities Per Job, Per Environment, and Per Bundle. This section will attempt to outline the threads associated with Prism when -executing a job. This will not include SDK side threads, such as those +executing a job. This will not include SDK-side threads, such as those within containers, or started by an external worker service. As a rule of thumb, each Bundle is processed on an independant goroutine, with a few exceptions. -jobservices.Server implements a beam JobManagmenent GRPC services. GRPC servers +jobservices.Server implements a beam JobManagmenent GRPC service. GRPC servers have goroutines managed by GRPC itself. Call this G goroutines. When RunJob is called, the server starts a goroutine with the Job Executor function. @@ -406,7 +406,7 @@ and one for the FnAPI server for that Environment, to communicate with the SDK W in that Environment. These will persist for the lifetime of the job. After producing stages for the job the Job Executor goroutine will wait on Bundles -from the ElementManager, and execute them in parallel. +from the `ElementManager`, and execute them in parallel. Each bundle has it's own goroutine. This is by default configured to a maximum of 8 simultaneous Bundles. @@ -415,13 +415,13 @@ This handles sending the bundle for execution on the SDK worker, managing progre split requests, and persisting data and other results after the bundle completes. The FnAPI server side worker will have several goroutines to handle requests and responses -from the SDK worker. FnAPI services are typically bi directional streaming RPCs. +from the SDK worker. FnAPI services are typically bi-directional streaming RPCs. There will be one goroutine handling requests being sent to the SDK, and another for responses from the SDK. So there will be a pair of goroutines for each of the Control, Data, and State streams, and a single one for Logging. -The ElementManager returns a channel to the Job Executor in order to relay those bundles, and +The `ElementManager` returns a channel to the Job Executor in order to relay those bundles, and starts two goroutines in order to populate that channel with bundles. One goroutine is tasked with evaluating and refreshing the watermark whenever a stage persists a bundle. This goroutine sends bundles on the channel when appropriate. This goroutine waits on @@ -441,7 +441,7 @@ For a Prism instance: For each Job: * 1 for the Job Executor -* 2 for the ElementManager +* 2 for the `ElementManager` For each Environment: * 2 for the Environment itself @@ -473,9 +473,9 @@ FnAPI messages are multiplexed so the expectation is the data service goroutines be the busiest moving data back and forth from the SDK. A consequence of this approach is the need to take care in locking shared resources and data -when they may be accessed by multiple goroutines. In particular, is all done in the ElementManager +when they may be accessed by multiple goroutines. This,Iin particular, is all done in the `ElementManager` which has locks for each stage in order to serialze access to it's state. This state is notably -accessed by the Bundle goroutines on persisting data back to the Element manager. +accessed by the Bundle goroutines on persisting data back to the `ElementManager`. For best performance, we do as much work as possible in the Bundle Goroutines since they are what can most dynamically scale out, as bundle generation is handled by the single watermark @@ -488,7 +488,7 @@ for the worker endpoint, these are no wasted either. Prism can assign uniques na to be able to identify which job they're a part of, so it's possible to avoid the per job and worker grpc service, and multiplex from there. (See https://github.com/apache/beam/issues/32167) -A channel is being used to move ready to execute bundles from the ElementManager to the Job Executor. +A channel is being used to move ready to execute bundles from the `ElementManager` to the Job Executor. This may be unbuffered (the default) which means serializing how bundles are generated for execution, and there being at most a single "readyToExecute" @@ -506,18 +506,18 @@ to higher lock contention and variability in execution. ## Durability Model -Prism keeps all data in memory, and doesn't write anything durably to disk. +Prism keeps all data in memory and doesn't write anything durably to disk. Recommended only for testing purposes. It's not suitable for any long term production work due to the risk of dataloss or recomputing. -SDK side code however may be make durable changes or side effects in external systems. +SDK-side code, however, may make durable changes or side effects in external systems. Prism cannot restrict such behavior. -Prism doesn't retry failed bundle, and simply fails the job instead. +Prism doesn't retry failed bundles, and simply fails the job instead. ## Execution Distribution Model -Prism is intended to be single machine, local runner. If it does start up docker containers +Prism is intended to be single-machine, local runner. If it does start up docker containers for SDK environments, they'll be on the same machine as Prism. # Glossary From d1957b2869d71e1af7a3c0bbdf0b3def5abd2fd6 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Thu, 3 Oct 2024 10:28:13 -0700 Subject: [PATCH 10/10] Apply suggestions from code review Co-authored-by: Jack McCluskey <34928439+jrmccluskey@users.noreply.github.com> --- sdks/go/pkg/beam/runners/prism/internal/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/README.md b/sdks/go/pkg/beam/runners/prism/internal/README.md index 9fb2b63280da..684d0a80c516 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/README.md +++ b/sdks/go/pkg/beam/runners/prism/internal/README.md @@ -63,8 +63,8 @@ If no other option exists, it may use direct access to exported data from the ot ## Testing The sub packages should have reasonable Unit Test coverage in their own directories, but -most features will be exercised via executing pipelines in the internal package, and engine -package. To avoid overly re-implementing tests, use of the "_test" formalism to validate +most features will be exercised via executing pipelines in the internal and engine +packages. To avoid overly re-implementing tests, use of the "_test" formalism to validate coverage through re-using tests from sdks/go/tests is recommended. For the time being test DoFns should be added to standard build in order to validate execution @@ -341,7 +341,7 @@ even begin processing. This should take into account the current state of the pipeline, oustanding data to be processed, and the current load on the system. Larger bundles require fewer "round trips" to the SDK and batch processing, but in general, are processed serially by the runner, leading to higher latency for downstream -results. Smaller bundles may incur per bundle overhead more frequently, but can yeild lower +results. Smaller bundles may incur per bundle overhead more frequently, but can yield lower latency execution. Runners must strike a balance, to avoid over splitting, where per bundle overhead dominates, or @@ -473,7 +473,7 @@ FnAPI messages are multiplexed so the expectation is the data service goroutines be the busiest moving data back and forth from the SDK. A consequence of this approach is the need to take care in locking shared resources and data -when they may be accessed by multiple goroutines. This,Iin particular, is all done in the `ElementManager` +when they may be accessed by multiple goroutines. This, in particular, is all done in the `ElementManager` which has locks for each stage in order to serialze access to it's state. This state is notably accessed by the Bundle goroutines on persisting data back to the `ElementManager`. @@ -484,7 +484,7 @@ received goroutine and into the Bundle processing goroutine, so bundles are less block each other. As one G of goroutines is the single Job Management instance for prism itself, and the other is -for the worker endpoint, these are no wasted either. Prism can assign uniques names to workers +for the worker endpoint, these are not wasted either. Prism can assign uniques names to workers to be able to identify which job they're a part of, so it's possible to avoid the per job and worker grpc service, and multiplex from there. (See https://github.com/apache/beam/issues/32167)