diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 658978fd7dd6..95e1bc814dc9 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -816,7 +816,10 @@ def _add_argparse_args(cls, parser): parser.add_argument( '--environment_type', default=None, help=('Set the default environment type for running ' - 'user code. Possible options are DOCKER and PROCESS.')) + 'user code. DOCKER (default) runs user code in a container. ' + 'PROCESS runs user code in processes that are automatically ' + 'started on each worker node. LOOPBACK runs user code on the ' + 'same process that originally submitted the job.')) parser.add_argument( '--environment_config', default=None, help=('Set environment configuration for running the user code.\n For ' diff --git a/website/src/documentation/runners/flink.md b/website/src/documentation/runners/flink.md index b52d026784a5..9bcb149454e2 100644 --- a/website/src/documentation/runners/flink.md +++ b/website/src/documentation/runners/flink.md @@ -247,13 +247,10 @@ If you have a Flink `JobManager` running on your local machine you can provide ` As of now you will need a copy of Apache Beam's source code. You can download it on the [Downloads page]({{ site.baseurl }}/get-started/downloads/). In the future there will be pre-built Docker images -available. +available. To run a pipeline on an embedded Flink cluster: -1. *Only required once:* Build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker` - - -2. Start the JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow` +1. Start the JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow` @@ -263,14 +260,18 @@ To execute the job on a Flink cluster, the Beam JobService needs to be provided with the Flink JobManager address. -3. Submit the Python pipeline to the above endpoint by using the `PortableRunner` and `job_endpoint` set to `localhost:8099` (this is the default address of the JobService). For example: +2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example: ```py import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"]) +options = PipelineOptions([ + "--runner=PortableRunner", + "--job_endpoint=localhost:8099", + "--environment_type=LOOPBACK" +]) with beam.Pipeline(options) as p: ... ``` @@ -286,6 +287,8 @@ To run on a separate [Flink cluster](https://ci.apache.org/projects/flink/flink- 3. Submit the pipeline as above. +Note however that `environment_type=LOOPBACK` is only intended for local testing. +See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details. As of Beam 2.15.0, steps 2 and 3 can be automated in Python by using the `FlinkRunner`, @@ -296,7 +299,12 @@ plus the optional `flink_version` and `flink_master_url` options if required, i. import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -options = PipelineOptions(["--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081"]) +options = PipelineOptions([ + "--runner=FlinkRunner", + "--flink_version=1.8", + "--flink_master_url=localhost:8081", + "--environment_type=LOOPBACK" +]) with beam.Pipeline(options) as p: ... ``` diff --git a/website/src/documentation/runners/spark.md b/website/src/documentation/runners/spark.md index 9edff5efdfca..fa48df67099c 100644 --- a/website/src/documentation/runners/spark.md +++ b/website/src/documentation/runners/spark.md @@ -164,10 +164,7 @@ download it on the [Downloads page]({{ site.baseurl available. -1. *Only required once:* Build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker` - - -2. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow` +1. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow` @@ -177,17 +174,20 @@ job. To execute the job on a Spark cluster, the Beam JobService needs to be provided with the Spark master address. -3. Submit the Python pipeline to the above endpoint by using the `PortableRunner` and `job_endpoint` set to `localhost:8099` (this is the default address of the JobService). For example: +2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example: ```py import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions -options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"]) -p = beam.Pipeline(options) -.. -p.run() +options = PipelineOptions([ + "--runner=PortableRunner", + "--job_endpoint=localhost:8099", + "--environment_type=LOOPBACK" +]) +with beam.Pipeline(options) as p: + ... ``` ### Running on a pre-deployed Spark cluster @@ -202,6 +202,13 @@ For more details on the different deployment modes see: [Standalone](http://spar 3. Submit the pipeline as above. +Note however that `environment_type=LOOPBACK` is only intended for local testing. +See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details. + + + +(Note that, depending on your cluster setup, you may need to change the `environment_type` option. +See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details.) ## Pipeline options for the Spark Runner diff --git a/website/src/roadmap/portability.md b/website/src/roadmap/portability.md index 89c61ab4c239..b1d8a75f7a48 100644 --- a/website/src/roadmap/portability.md +++ b/website/src/roadmap/portability.md @@ -151,27 +151,35 @@ for details. ### Running Python wordcount on Flink {#python-on-flink} -To run a basic Python wordcount (in batch mode) with embedded Flink: - -1. Run once to build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker` -2. Start the Flink portable JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow` -3. In a new terminal, submit the wordcount pipeline to above endpoint: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK` - -To run the pipeline in streaming mode: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -Pstreaming` - +The Beam Flink runner can run Python pipelines in batch and streaming modes. Please see the [Flink Runner page]({{ site.baseurl }}/documentation/runners/flink/) for more information on how to run portable pipelines on top of Flink. ### Running Python wordcount on Spark {#python-on-spark} -To run a basic Python wordcount (in batch mode) with embedded Spark: - -1. Run once to build the SDK harness container: `./gradlew :sdks:python:container:docker` -2. Start the Spark portable JobService endpoint: `./gradlew :runners:spark:job-server:runShadow` -3. In a new terminal, submit the wordcount pipeline to above endpoint: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK` - -Python streaming mode is not yet supported on Spark. - +The Beam Spark runner can run Python pipelines in batch mode. Please see the [Spark Runner page]({{ site.baseurl }}/documentation/runners/spark/) for more information on how to run portable pipelines on top of Spark. +Python streaming mode is not yet supported on Spark. + +## SDK Harness Configuration {#sdk-harness-config} + +The Beam Python SDK allows configuration of the SDK harness to accommodate varying cluster setups. + +- `environment_type` determines where user code will be executed. + - `LOOPBACK`: User code is executed within the same process that submitted the pipeline. This + option is useful for local testing. However, it is not suitable for a production environment, + as it requires a connection between the original Python process and the worker nodes, and + performs work on the machine the job originated from, not the worker nodes. + - `PROCESS`: User code is executed by processes that are automatically started by the runner on + each worker node. + - `DOCKER` (default): User code is executed within a container started on each worker node. + This requires docker to be installed on worker nodes. For more information, see + [here](https://github.com/apache/beam/blob/master/sdks/CONTAINERS.md). +- `environment_config` configures the environment depending on the value of `environment_type`. + - When `environment_type=DOCKER`: URL for the Docker container image. + - When `environment_type=PROCESS`: JSON of the form `{"os": "", "arch": "", + "command": "", "env":{"": ""} }`. All + fields in the JSON are optional except `command`. +- `sdk_worker_parallelism` sets the number of SDK workers that will run on each worker node. \ No newline at end of file