Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,10 @@ def _add_argparse_args(cls, parser):
parser.add_argument(
'--environment_type', default=None,
help=('Set the default environment type for running '
'user code. Possible options are DOCKER and PROCESS.'))
'user code. DOCKER (default) runs user code in a container. '
'PROCESS runs user code in processes that are automatically '
'started on each worker node. LOOPBACK runs user code on the '
'same process that originally submitted the job.'))
parser.add_argument(
'--environment_config', default=None,
help=('Set environment configuration for running the user code.\n For '
Expand Down
24 changes: 16 additions & 8 deletions website/src/documentation/runners/flink.md
Original file line number Diff line number Diff line change
Expand Up @@ -247,13 +247,10 @@ If you have a Flink `JobManager` running on your local machine you can provide `
As of now you will need a copy of Apache Beam's source code. You can
download it on the [Downloads page]({{ site.baseurl
}}/get-started/downloads/). In the future there will be pre-built Docker images
available.
available. To run a pipeline on an embedded Flink cluster:
</span>

<span class="language-py">1. *Only required once:* Build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker`
</span>

<span class="language-py">2. Start the JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow`
<span class="language-py">1. Start the JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow`
</span>

<span class="language-py">
Expand All @@ -263,14 +260,18 @@ To execute the job on a Flink cluster, the Beam JobService needs to be
provided with the Flink JobManager address.
</span>

<span class="language-py">3. Submit the Python pipeline to the above endpoint by using the `PortableRunner` and `job_endpoint` set to `localhost:8099` (this is the default address of the JobService). For example:
<span class="language-py">2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example:
</span>

```py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
...
```
Expand All @@ -286,6 +287,8 @@ To run on a separate [Flink cluster](https://ci.apache.org/projects/flink/flink-
</span>

<span class="language-py">3. Submit the pipeline as above.
Note however that `environment_type=LOOPBACK` is only intended for local testing.
See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should mention here that the default environment_type is DOCKER and there is no need to specify anything for environment_type if that deployment option is ok.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the instructions to use loopback, so docker would be a change from that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw that. I just think that it would be fair to point out the default is Docker and the LOOPBACK is just for experimentation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I made that more explicit.

</span>

<span class="language-py">As of Beam 2.15.0, steps 2 and 3 can be automated in Python by using the `FlinkRunner`,
Expand All @@ -296,7 +299,12 @@ plus the optional `flink_version` and `flink_master_url` options if required, i.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=FlinkRunner", "--flink_version=1.8", "--flink_master_url=localhost:8081"])
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_version=1.8",
"--flink_master_url=localhost:8081",
"--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
...
```
Expand Down
25 changes: 16 additions & 9 deletions website/src/documentation/runners/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ download it on the [Downloads page]({{ site.baseurl
available.
</span>

<span class="language-py">1. *Only required once:* Build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker`
</span>

<span class="language-py">2. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow`
<span class="language-py">1. Start the JobService endpoint: `./gradlew :runners:spark:job-server:runShadow`
</span>

<span class="language-py">
Expand All @@ -177,17 +174,20 @@ job. To execute the job on a Spark cluster, the Beam JobService needs to be
provided with the Spark master address.
</span>

<span class="language-py">3. Submit the Python pipeline to the above endpoint by using the `PortableRunner` and `job_endpoint` set to `localhost:8099` (this is the default address of the JobService). For example:
<span class="language-py">2. Submit the Python pipeline to the above endpoint by using the `PortableRunner`, `job_endpoint` set to `localhost:8099` (this is the default address of the JobService), and `environment_type` set to `LOOPBACK`. For example:
</span>

```py
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])
p = beam.Pipeline(options)
..
p.run()
options = PipelineOptions([
"--runner=PortableRunner",
"--job_endpoint=localhost:8099",
"--environment_type=LOOPBACK"
])
with beam.Pipeline(options) as p:
...
```

### Running on a pre-deployed Spark cluster
Expand All @@ -202,6 +202,13 @@ For more details on the different deployment modes see: [Standalone](http://spar
</span>

<span class="language-py">3. Submit the pipeline as above.
Note however that `environment_type=LOOPBACK` is only intended for local testing.
See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details.
</span>

<span class="language-py">
(Note that, depending on your cluster setup, you may need to change the `environment_type` option.
See [here]({{ site.baseurl }}/roadmap/portability/#sdk-harness-config) for details.)
</span>

## Pipeline options for the Spark Runner
Expand Down
40 changes: 24 additions & 16 deletions website/src/roadmap/portability.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,27 +151,35 @@ for details.

### Running Python wordcount on Flink {#python-on-flink}

To run a basic Python wordcount (in batch mode) with embedded Flink:

1. Run once to build the SDK harness container (optionally replace py35 with the Python version of your choice): `./gradlew :sdks:python:container:py35:docker`
2. Start the Flink portable JobService endpoint: `./gradlew :runners:flink:1.5:job-server:runShadow`
3. In a new terminal, submit the wordcount pipeline to above endpoint: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK`

To run the pipeline in streaming mode: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -Pstreaming`

The Beam Flink runner can run Python pipelines in batch and streaming modes.
Please see the [Flink Runner page]({{ site.baseurl }}/documentation/runners/flink/) for more information on
how to run portable pipelines on top of Flink.

### Running Python wordcount on Spark {#python-on-spark}

To run a basic Python wordcount (in batch mode) with embedded Spark:

1. Run once to build the SDK harness container: `./gradlew :sdks:python:container:docker`
2. Start the Spark portable JobService endpoint: `./gradlew :runners:spark:job-server:runShadow`
3. In a new terminal, submit the wordcount pipeline to above endpoint: `./gradlew portableWordCount -PjobEndpoint=localhost:8099 -PenvironmentType=LOOPBACK`

Python streaming mode is not yet supported on Spark.

The Beam Spark runner can run Python pipelines in batch mode.
Please see the [Spark Runner page]({{ site.baseurl }}/documentation/runners/spark/) for more information on
how to run portable pipelines on top of Spark.

Python streaming mode is not yet supported on Spark.

## SDK Harness Configuration {#sdk-harness-config}

The Beam Python SDK allows configuration of the SDK harness to accommodate varying cluster setups.

- `environment_type` determines where user code will be executed.
- `LOOPBACK`: User code is executed within the same process that submitted the pipeline. This
option is useful for local testing. However, it is not suitable for a production environment,
as it requires a connection between the original Python process and the worker nodes, and
performs work on the machine the job originated from, not the worker nodes.
- `PROCESS`: User code is executed by processes that are automatically started by the runner on
each worker node.
- `DOCKER` (default): User code is executed within a container started on each worker node.
This requires docker to be installed on worker nodes. For more information, see
[here](https://github.com/apache/beam/blob/master/sdks/CONTAINERS.md).
- `environment_config` configures the environment depending on the value of `environment_type`.
- When `environment_type=DOCKER`: URL for the Docker container image.
- When `environment_type=PROCESS`: JSON of the form `{"os": "<OS>", "arch": "<ARCHITECTURE>",
"command": "<process to execute>", "env":{"<Environment variables 1>": "<ENV_VAL>"} }`. All
fields in the JSON are optional except `command`.
- `sdk_worker_parallelism` sets the number of SDK workers that will run on each worker node.