diff --git a/runners/spark/.gitignore b/runners/spark/.gitignore
new file mode 100644
index 000000000000..0b4510ca11dc
--- /dev/null
+++ b/runners/spark/.gitignore
@@ -0,0 +1,10 @@
+.classpath
+.project
+.settings
+.cache
+target
+*.iml
+.idea
+gen
+.DS_Store
+dependency-reduced-pom.xml
\ No newline at end of file
diff --git a/runners/spark/.travis.yml b/runners/spark/.travis.yml
new file mode 100644
index 000000000000..c4af8a6769a1
--- /dev/null
+++ b/runners/spark/.travis.yml
@@ -0,0 +1,22 @@
+language: java
+sudo: false
+install: mvn ${JAVA} ${SPARK} -DskipTests=true -Dmaven.javadoc.skip=true -B -V install
+script: mvn ${JAVA} ${SPARK} ${JACOCO} -Dmaven.javadoc.skip=true -B verify
+matrix:
+ include:
+ # Covers Java 7, Open JDK, Spark 1.3.x, and code coverage
+ - jdk: openjdk7
+ env: JACOCO=-Pjacoco
+ # Covers Spark 1.4.x
+ - jdk: openjdk7
+ env: SPARK=-Dspark.version=1.4.1
+ # Covers Spark 1.5.x
+ - jdk: openjdk7
+ env: SPARK=-Dspark.version=1.5.1
+ # Covers Java 8, Oracle JDK
+ - jdk: oraclejdk8
+ env: JAVA=-Djava.version=1.8
+cache:
+ directories:
+ - $HOME/.m2
+after_success: if [ -n "$JACOCO" ]; then bash <(curl -s https://codecov.io/bash); fi
diff --git a/runners/spark/CONTRIBUTING.md b/runners/spark/CONTRIBUTING.md
new file mode 100644
index 000000000000..1781a8662b3e
--- /dev/null
+++ b/runners/spark/CONTRIBUTING.md
@@ -0,0 +1,8 @@
+## Licensing
+
+Contributions via GitHub pull requests are gladly accepted from their original author.
+Along with any pull requests, please state that the contribution is your original work and
+that you license the work to the project under the project's open source license.
+Whether or not you state this explicitly, by submitting any copyrighted material via
+pull request, email, or other means you agree to license the material under the project's
+open source license and warrant that you have the legal authority to do so.
diff --git a/runners/spark/LICENSE b/runners/spark/LICENSE
new file mode 100644
index 000000000000..a647a925d538
--- /dev/null
+++ b/runners/spark/LICENSE
@@ -0,0 +1,161 @@
+Apache License
+
+Version 2.0, January 2004
+
+http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
+
+"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
+
+"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
+
+"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
+
+"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
+
+"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
+
+"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
+
+"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
+
+"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
+
+"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
+
+You must give any other recipients of the Work or Derivative Works a copy of this License; and
+
+You must cause any modified files to carry prominent notices stating that You changed the files; and
+
+You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
+
+If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work
+To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
+===============================================================================
+
+This project also distributes third-party software licensed under other open-source licenses:
+
+BSD License (http://opensource.org/licenses/BSD-3-Clause):
+
+ JPMML (https://github.com/jpmml/jpmml-model)
+ ParaNamer Core (https://github.com/paul-hammant/paranamer)
+ Protocol Buffer Java API (http://code.google.com/p/protobuf)
+
+MIT License (http://opensource.org/licenses/mit-license.php):
+
+ SLF4J (http://www.slf4j.org/)
+Apache License
+
+Version 2.0, January 2004
+
+http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
+
+"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
+
+"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
+
+"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
+
+"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
+
+"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
+
+"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
+
+"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
+
+"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
+
+"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
+
+You must give any other recipients of the Work or Derivative Works a copy of this License; and
+
+You must cause any modified files to carry prominent notices stating that You changed the files; and
+
+You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
+
+If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work
+To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+
diff --git a/runners/spark/README.md b/runners/spark/README.md
new file mode 100644
index 000000000000..ccf85161fade
--- /dev/null
+++ b/runners/spark/README.md
@@ -0,0 +1,113 @@
+spark-dataflow
+==============
+
+## Intro
+
+Spark-dataflow allows users to execute data pipelines written against the Google Cloud Dataflow API
+with Apache Spark. Spark-dataflow is an early prototype, and we'll be working on it continuously.
+If this project interests you, we welcome issues, comments, and (especially!) pull requests.
+To get an idea of what we have already identified as
+areas that need improvement, checkout the issues listed in the github repo.
+
+## Motivation
+
+We had two primary goals when we started working on Spark-dataflow:
+
+1. *Provide portability for data pipelines written for Google Cloud Dataflow.* Google makes
+it really easy to get started writing pipelines against the Dataflow API, but they wanted
+to be sure that creating a pipeline using their tools would not lock developers in to their
+platform. A Spark-based implementation of Dataflow means that you can take your pipeline
+logic with you wherever you go. This also means that any new machine learning and anomaly
+detection algorithms that are developed against the Dataflow API are available to everyone,
+regardless of their underlying execution platform.
+
+2. *Experiment with new data pipeline design patterns.* The Dataflow API has a number of
+interesting ideas, especially with respect to the unification of batch and stream data
+processing into a single API that maps into two separate engines. The Dataflow streaming
+engine, based on Google's [Millwheel](http://research.google.com/pubs/pub41378.html), does
+not have a direct open source analogue, and we wanted to understand how to replicate its
+functionality using frameworks like Spark Streaming.
+
+## Getting Started
+
+The Maven coordinates of the current version of this project are:
+
+ com.cloudera.dataflow.spark
+ spark-dataflow
+ 0.4.2
+
+and are hosted in Cloudera's repository at:
+
+
+ cloudera.repo
+ https://repository.cloudera.com/artifactory/cloudera-repos
+
+
+If we wanted to run a dataflow pipeline with the default options of a single threaded spark
+instance in local mode, we would do the following:
+
+ Pipeline p =
+ EvaluationResult result = SparkPipelineRunner.create().run(p);
+
+To create a pipeline runner to run against a different spark cluster, with a custom master url we
+would do the following:
+
+ Pipeline p =
+ SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
+ options.setSparkMaster("spark://host:port");
+ EvaluationResult result = SparkPipelineRunner.create(options).run(p);
+
+## Word Count Example
+
+First download a text document to use as input:
+
+ curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt > /tmp/kinglear.txt
+
+Then run the [word count example][wc] from the SDK using a single threaded Spark instance
+in local mode:
+
+ mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \
+ -Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \
+ -DsparkMaster=local
+
+Check the output by running:
+
+ head /tmp/out-00000-of-00001
+
+__Note: running examples using `mvn exec:exec` only works for Spark local mode at the
+moment. See the next section for how to run on a cluster.__
+
+[wc]: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
+
+## Running on a Cluster
+
+Spark Dataflow pipelines can be run on a cluster using the `spark-submit` command.
+
+First copy a text document to HDFS:
+
+ curl http://www.gutenberg.org/cache/epub/1128/pg1128.txt | hadoop fs -put - kinglear.txt
+
+Then run the word count example using Spark submit with the `yarn-client` master
+(`yarn-cluster` works just as well):
+
+ spark-submit \
+ --class com.google.cloud.dataflow.examples.WordCount \
+ --master yarn-client \
+ target/spark-dataflow-*-spark-app.jar \
+ --inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client
+
+Check the output by running:
+
+ hadoop fs -tail out-00000-of-00002
+
+## How to Release
+
+Committers can release the project using the standard [Maven Release Plugin](http://maven.apache.org/maven-release/maven-release-plugin/) commands:
+
+ mvn release:prepare
+ mvn release:perform -Darguments="-Dgpg.passphrase=XXX"
+
+Note that you will need a [public GPG key](http://www.apache.org/dev/openpgp.html).
+
+[](https://travis-ci.org/cloudera/spark-dataflow)
+[](https://codecov.io/github/cloudera/spark-dataflow?branch=master)
diff --git a/runners/spark/build-resources/checkstyle.xml b/runners/spark/build-resources/checkstyle.xml
new file mode 100644
index 000000000000..c5b884d7a9dc
--- /dev/null
+++ b/runners/spark/build-resources/checkstyle.xml
@@ -0,0 +1,222 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/runners/spark/build-resources/header-file.txt b/runners/spark/build-resources/header-file.txt
new file mode 100644
index 000000000000..6d81b4dc254a
--- /dev/null
+++ b/runners/spark/build-resources/header-file.txt
@@ -0,0 +1,14 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
\ No newline at end of file
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
new file mode 100644
index 000000000000..399e9e77ad34
--- /dev/null
+++ b/runners/spark/pom.xml
@@ -0,0 +1,469 @@
+
+
+
+ 4.0.0
+ Dataflow on Spark
+ com.cloudera.dataflow.spark
+ spark-dataflow
+ 0.4.3-SNAPSHOT
+ jar
+
+
+ UTF-8
+ UTF-8
+ 1.7
+ 1.5.2
+ 1.3.0
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.5
+
+ ${java.version}
+ ${java.version}
+
+ -Xlint:all,-serial
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.19.1
+
+ 1
+ false
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+ 2.17
+
+ build-resources/header-file.txt
+ build-resources/checkstyle.xml
+
+
+
+ validate
+ validate
+
+ check
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+ 2.4
+
+
+ attach-sources
+
+ jar-no-fork
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 2.10.3
+
+ ${java.version}
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-clean-plugin
+ 3.0.0
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.8.2
+
+
+ org.apache.maven.plugins
+ maven-install-plugin
+ 2.5.2
+
+ true
+
+
+
+ org.apache.maven.plugins
+ maven-resources-plugin
+ 2.7
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.6
+
+
+ org.apache.maven.plugins
+ maven-site-plugin
+ 3.4
+
+
+ org.apache.maven.plugins
+ maven-release-plugin
+ 2.5.3
+
+
+ org.apache.maven.scm
+ maven-scm-provider-gitexe
+ 1.9.2
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+ 1.6
+
+
+ sign-artifacts
+ verify
+
+ sign
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+ 0.11
+
+
+ .travis.yml
+ **/*.conf
+ **/*.iml
+ **/*.md
+ **/*.txt
+ **/gen/**
+ **/resources/**
+ **/target/**
+ **/dependency-reduced-pom.xml
+ false
+
+
+
+
+ verify
+ verify
+
+ check
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.4.0
+
+ java
+ test
+
+ -classpath
+
+ ${mainClass}
+ --inputFile=${input}
+ --output=${output}
+ --runner=${runner}
+ --sparkMaster=${sparkMaster}
+
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+ 0.7.5.201505241946
+
+
+
+ prepare-agent
+
+
+
+ report
+ test
+
+ report
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+ 2.4.3
+
+
+ package
+
+ shade
+
+
+
+
+
+ com.google.common
+ com.cloudera.dataflow.spark.relocated.com.google.common
+
+
+ true
+ spark-app
+
+
+
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ org.apache.maven.plugins
+ maven-source-plugin
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
+
+
+ org.apache.spark
+ spark-core_2.10
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming_2.10
+ ${spark.version}
+ provided
+
+
+ org.apache.spark
+ spark-streaming-kafka_2.10
+ ${spark.version}
+ provided
+
+
+ org.apache.kafka
+ kafka_2.10
+ 0.8.2.1
+ provided
+
+
+ com.google.guava
+ guava
+ 18.0
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-all
+ ${google-cloud-dataflow-version}
+
+
+
+ org.slf4j
+ slf4j-jdk14
+
+
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-examples-all
+ ${google-cloud-dataflow-version}
+
+
+
+ org.slf4j
+ slf4j-jdk14
+
+
+
+
+ org.apache.avro
+ avro-mapred
+ 1.7.7
+ hadoop2
+
+
+
+ org.mortbay.jetty
+ servlet-api
+
+
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+ org.hamcrest
+ hamcrest-all
+ 1.3
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+
+
+ aggregate
+
+ aggregate
+ test-aggregate
+
+
+
+
+
+
+
+
+ http://github.com/cloudera/spark-dataflow
+ 2014
+
+
+ The Apache Software License, Version 2.0
+ http://www.apache.org/licenses/LICENSE-2.0.txt
+ repo
+
+
+
+
+ Cloudera, Inc.
+
+
+
+
+ GitHub
+ https://github.com/cloudera/spark-dataflow/issues
+
+
+ scm:git:https://github.com/cloudera/spark-dataflow.git
+ scm:git:https://github.com/cloudera/spark-dataflow.git
+ scm:git:https://github.com/cloudera/spark-dataflow.git
+ HEAD
+
+
+
+ 3.2.1
+
+
+
+
+ cloudera.repo
+ https://repository.cloudera.com/artifactory/cloudera-repos
+ Cloudera Repositories
+
+ true
+
+
+ true
+
+
+
+
+
+
+ cloudera.repo
+ https://repository.cloudera.com/artifactory/libs-release-local
+
+
+ cloudera.snapshots.repo
+ https://repository.cloudera.com/artifactory/libs-snapshot-local
+
+
+
+
+
+ release-sign-artifacts
+
+
+ performRelease
+ true
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-gpg-plugin
+
+
+
+
+
+ jacoco
+
+
+
+ org.jacoco
+ jacoco-maven-plugin
+
+
+
+
+
+
+
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
new file mode 100644
index 000000000000..c79f2113cc51
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/HadoopIO.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.hadoop;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.cloud.dataflow.sdk.io.ShardNameTemplate;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import com.cloudera.dataflow.spark.ShardNameTemplateAware;
+
+public final class HadoopIO {
+
+ private HadoopIO() {
+ }
+
+ public static final class Read {
+
+ private Read() {
+ }
+
+ public static Bound from(String filepattern,
+ Class extends FileInputFormat> format, Class key, Class value) {
+ return new Bound<>(filepattern, format, key, value);
+ }
+
+ public static class Bound extends PTransform>> {
+
+ private final String filepattern;
+ private final Class extends FileInputFormat> formatClass;
+ private final Class keyClass;
+ private final Class valueClass;
+
+ Bound(String filepattern, Class extends FileInputFormat> format, Class key,
+ Class value) {
+ Preconditions.checkNotNull(filepattern,
+ "need to set the filepattern of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(format,
+ "need to set the format class of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(key,
+ "need to set the key class of an HadoopIO.Read transform");
+ Preconditions.checkNotNull(value,
+ "need to set the value class of an HadoopIO.Read transform");
+ this.filepattern = filepattern;
+ this.formatClass = format;
+ this.keyClass = key;
+ this.valueClass = value;
+ }
+
+ public String getFilepattern() {
+ return filepattern;
+ }
+
+ public Class extends FileInputFormat> getFormatClass() {
+ return formatClass;
+ }
+
+ public Class getValueClass() {
+ return valueClass;
+ }
+
+ public Class getKeyClass() {
+ return keyClass;
+ }
+
+ @Override
+ public PCollection> apply(PInput input) {
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED);
+ }
+
+ }
+
+ }
+
+ public static final class Write {
+
+ private Write() {
+ }
+
+ public static Bound to(String filenamePrefix,
+ Class extends FileOutputFormat> format, Class key, Class value) {
+ return new Bound<>(filenamePrefix, format, key, value);
+ }
+
+ public static class Bound extends PTransform>, PDone> {
+
+ /** The filename to write to. */
+ private final String filenamePrefix;
+ /** Suffix to use for each filename. */
+ private final String filenameSuffix;
+ /** Requested number of shards. 0 for automatic. */
+ private final int numShards;
+ /** Shard template string. */
+ private final String shardTemplate;
+ private final Class extends FileOutputFormat> formatClass;
+ private final Class keyClass;
+ private final Class valueClass;
+ private final Map configurationProperties;
+
+ Bound(String filenamePrefix, Class extends FileOutputFormat> format,
+ Class key,
+ Class value) {
+ this(filenamePrefix, "", 0, ShardNameTemplate.INDEX_OF_MAX, format, key, value,
+ new HashMap());
+ }
+
+ Bound(String filenamePrefix, String filenameSuffix, int numShards,
+ String shardTemplate, Class extends FileOutputFormat> format,
+ Class key, Class value, Map configurationProperties) {
+ this.filenamePrefix = filenamePrefix;
+ this.filenameSuffix = filenameSuffix;
+ this.numShards = numShards;
+ this.shardTemplate = shardTemplate;
+ this.formatClass = format;
+ this.keyClass = key;
+ this.valueClass = value;
+ this.configurationProperties = configurationProperties;
+ }
+
+ public Bound withoutSharding() {
+ return new Bound<>(filenamePrefix, filenameSuffix, 1, "", formatClass,
+ keyClass, valueClass, configurationProperties);
+ }
+
+ public Bound withConfigurationProperty(String key, String value) {
+ configurationProperties.put(key, value);
+ return this;
+ }
+
+ public String getFilenamePrefix() {
+ return filenamePrefix;
+ }
+
+ public String getShardTemplate() {
+ return shardTemplate;
+ }
+
+ public int getNumShards() {
+ return numShards;
+ }
+
+ public String getFilenameSuffix() {
+ return filenameSuffix;
+ }
+
+ public Class extends FileOutputFormat> getFormatClass() {
+ return formatClass;
+ }
+
+ public Class getValueClass() {
+ return valueClass;
+ }
+
+ public Class getKeyClass() {
+ return keyClass;
+ }
+
+ public Map getConfigurationProperties() {
+ return configurationProperties;
+ }
+
+ @Override
+ public PDone apply(PCollection> input) {
+ Preconditions.checkNotNull(filenamePrefix,
+ "need to set the filename prefix of an HadoopIO.Write transform");
+ Preconditions.checkNotNull(formatClass,
+ "need to set the format class of an HadoopIO.Write transform");
+ Preconditions.checkNotNull(keyClass,
+ "need to set the key class of an HadoopIO.Write transform");
+ Preconditions.checkNotNull(valueClass,
+ "need to set the value class of an HadoopIO.Write transform");
+
+ Preconditions.checkArgument(ShardNameTemplateAware.class.isAssignableFrom(formatClass),
+ "Format class must implement " + ShardNameTemplateAware.class.getName());
+
+ return PDone.in(input.getPipeline());
+ }
+ }
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
new file mode 100644
index 000000000000..5e5d3919ac53
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/NullWritableCoder.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.hadoop;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import org.apache.hadoop.io.NullWritable;
+
+public final class NullWritableCoder extends WritableCoder {
+ private static final long serialVersionUID = 1L;
+
+ @JsonCreator
+ public static NullWritableCoder of() {
+ return INSTANCE;
+ }
+
+ private static final NullWritableCoder INSTANCE = new NullWritableCoder();
+
+ private NullWritableCoder() {
+ super(NullWritable.class);
+ }
+
+ @Override
+ public void encode(NullWritable value, OutputStream outStream, Context context) {
+ // nothing to write
+ }
+
+ @Override
+ public NullWritable decode(InputStream inStream, Context context) {
+ return NullWritable.get();
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ /**
+ * Returns true since registerByteSizeObserver() runs in constant time.
+ */
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(NullWritable value, Context context) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(NullWritable value, Context context) {
+ return 0;
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ // NullWritableCoder is deterministic
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
new file mode 100644
index 000000000000..324b203a2f15
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/hadoop/WritableCoder.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.hadoop;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.StandardCoder;
+import com.google.cloud.dataflow.sdk.util.CloudObject;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A {@code WritableCoder} is a {@link Coder} for a Java class that implements {@link Writable}.
+ *
+ *
To use, specify the coder type on a PCollection:
+ *
+ *
+ * @param the type of elements handled by this coder
+ */
+public class WritableCoder extends StandardCoder {
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Returns a {@code WritableCoder} instance for the provided element class.
+ * @param the element type
+ * @param clazz the element class
+ * @return a {@code WritableCoder} instance for the provided element class
+ */
+ public static WritableCoder of(Class clazz) {
+ if (clazz.equals(NullWritable.class)) {
+ @SuppressWarnings("unchecked")
+ WritableCoder result = (WritableCoder) NullWritableCoder.of();
+ return result;
+ }
+ return new WritableCoder<>(clazz);
+ }
+
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public static WritableCoder> of(@JsonProperty("type") String classType)
+ throws ClassNotFoundException {
+ Class> clazz = Class.forName(classType);
+ if (!Writable.class.isAssignableFrom(clazz)) {
+ throw new ClassNotFoundException(
+ "Class " + classType + " does not implement Writable");
+ }
+ return of((Class extends Writable>) clazz);
+ }
+
+ private final Class type;
+
+ public WritableCoder(Class type) {
+ this.type = type;
+ }
+
+ @Override
+ public void encode(T value, OutputStream outStream, Context context) throws IOException {
+ value.write(new DataOutputStream(outStream));
+ }
+
+ @Override
+ public T decode(InputStream inStream, Context context) throws IOException {
+ try {
+ T t = type.getConstructor().newInstance();
+ t.readFields(new DataInputStream(inStream));
+ return t;
+ } catch (NoSuchMethodException | InstantiationException | IllegalAccessException e) {
+ throw new CoderException("unable to deserialize record", e);
+ } catch (InvocationTargetException ite) {
+ throw new CoderException("unable to deserialize record", ite.getCause());
+ }
+ }
+
+ @Override
+ public List> getCoderArguments() {
+ return null;
+ }
+
+ @Override
+ public CloudObject asCloudObject() {
+ CloudObject result = super.asCloudObject();
+ result.put("type", type.getName());
+ return result;
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {
+ throw new NonDeterministicException(this,
+ "Hadoop Writable may be non-deterministic.");
+ }
+
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
new file mode 100644
index 000000000000..bc19b39068a5
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/ConsoleIO.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Print to console.
+ */
+public final class ConsoleIO {
+
+ private ConsoleIO() {
+ }
+
+ public static final class Write {
+
+ private Write() {
+ }
+
+ public static Unbound from() {
+ return new Unbound<>(10);
+ }
+
+ public static Unbound from(int num) {
+ return new Unbound<>(num);
+ }
+
+ public static class Unbound extends PTransform, PDone> {
+
+ private final int num;
+
+ Unbound(int num) {
+ this.num = num;
+ }
+
+ public int getNum() {
+ return num;
+ }
+
+ @Override
+ public PDone apply(PCollection input) {
+ return PDone.in(input.getPipeline());
+ }
+ }
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
new file mode 100644
index 000000000000..9a9927873222
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/CreateStream.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
+/**
+ * Create an input stream from Queue.
+ *
+ * @param stream type
+ */
+public final class CreateStream {
+
+ private CreateStream() {
+ }
+
+ /**
+ * Define the input stream to create from queue.
+ *
+ * @param queuedValues defines the input stream
+ * @param stream type
+ * @return the queue that defines the input stream
+ */
+ public static QueuedValues fromQueue(Iterable> queuedValues) {
+ return new QueuedValues<>(queuedValues);
+ }
+
+ public static final class QueuedValues extends PTransform> {
+
+ private final Iterable> queuedValues;
+
+ QueuedValues(Iterable> queuedValues) {
+ Preconditions.checkNotNull(queuedValues,
+ "need to set the queuedValues of an Create.QueuedValues transform");
+ this.queuedValues = queuedValues;
+ }
+
+ public Iterable> getQueuedValues() {
+ return queuedValues;
+ }
+
+ @Override
+ public PCollection apply(PInput input) {
+ // Spark streaming micro batches are bounded by default
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+ }
+ }
+
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
new file mode 100644
index 000000000000..154e6dacf37e
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/io/KafkaIO.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright (c) 2015, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.io;
+
+import java.util.Map;
+import java.util.Set;
+
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.common.base.Preconditions;
+
+import kafka.serializer.Decoder;
+
+/**
+ * Read stream from Kafka.
+ */
+public final class KafkaIO {
+
+ private KafkaIO() {
+ }
+
+ public static final class Read {
+
+ private Read() {
+ }
+
+ /**
+ * Define the Kafka consumption.
+ *
+ * @param keyDecoder {@link Decoder} to decode the Kafka message key
+ * @param valueDecoder {@link Decoder} to decode the Kafka message value
+ * @param key Kafka message key Class
+ * @param value Kafka message value Class
+ * @param topics Kafka topics to subscribe
+ * @param kafkaParams map of Kafka parameters
+ * @param Kafka message key Class type
+ * @param Kafka message value Class type
+ * @return KafkaIO Unbound input
+ */
+ public static Unbound from(Class extends Decoder> keyDecoder,
+ Class extends Decoder> valueDecoder,
+ Class key,
+ Class value, Set topics,
+ Map kafkaParams) {
+ return new Unbound<>(keyDecoder, valueDecoder, key, value, topics, kafkaParams);
+ }
+
+ public static class Unbound extends PTransform>> {
+
+ private final Class extends Decoder> keyDecoderClass;
+ private final Class extends Decoder> valueDecoderClass;
+ private final Class keyClass;
+ private final Class valueClass;
+ private final Set topics;
+ private final Map kafkaParams;
+
+ Unbound(Class extends Decoder> keyDecoder,
+ Class extends Decoder> valueDecoder, Class key,
+ Class value, Set topics, Map kafkaParams) {
+ Preconditions.checkNotNull(keyDecoder,
+ "need to set the key decoder class of a KafkaIO.Read transform");
+ Preconditions.checkNotNull(valueDecoder,
+ "need to set the value decoder class of a KafkaIO.Read transform");
+ Preconditions.checkNotNull(key,
+ "need to set the key class of aKafkaIO.Read transform");
+ Preconditions.checkNotNull(value,
+ "need to set the value class of a KafkaIO.Read transform");
+ Preconditions.checkNotNull(topics,
+ "need to set the topics of a KafkaIO.Read transform");
+ Preconditions.checkNotNull(kafkaParams,
+ "need to set the kafkaParams of a KafkaIO.Read transform");
+ this.keyDecoderClass = keyDecoder;
+ this.valueDecoderClass = valueDecoder;
+ this.keyClass = key;
+ this.valueClass = value;
+ this.topics = topics;
+ this.kafkaParams = kafkaParams;
+ }
+
+ public Class extends Decoder> getKeyDecoderClass() {
+ return keyDecoderClass;
+ }
+
+ public Class extends Decoder> getValueDecoderClass() {
+ return valueDecoderClass;
+ }
+
+ public Class getValueClass() {
+ return valueClass;
+ }
+
+ public Class getKeyClass() {
+ return keyClass;
+ }
+
+ public Set getTopics() {
+ return topics;
+ }
+
+ public Map getKafkaParams() {
+ return kafkaParams;
+ }
+
+ @Override
+ public PCollection> apply(PInput input) {
+ // Spark streaming micro batches are bounded by default
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+ }
+ }
+
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
new file mode 100644
index 000000000000..8dca939c5247
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/BroadcastHelper.java
@@ -0,0 +1,121 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.Serializable;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BroadcastHelper implements Serializable {
+
+ /**
+ * If the property {@code dataflow.spark.directBroadcast} is set to
+ * {@code true} then Spark serialization (Kryo) will be used to broadcast values
+ * in View objects. By default this property is not set, and values are coded using
+ * the appropriate {@link Coder}.
+ */
+ public static final String DIRECT_BROADCAST = "dataflow.spark.directBroadcast";
+
+ private static final Logger LOG = LoggerFactory.getLogger(BroadcastHelper.class);
+
+ public static BroadcastHelper create(T value, Coder coder) {
+ if (Boolean.parseBoolean(System.getProperty(DIRECT_BROADCAST, "false"))) {
+ return new DirectBroadcastHelper<>(value);
+ }
+ return new CodedBroadcastHelper<>(value, coder);
+ }
+
+ public abstract T getValue();
+
+ public abstract void broadcast(JavaSparkContext jsc);
+
+ /**
+ * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that relies on the underlying
+ * Spark serialization (Kryo) to broadcast values. This is appropriate when
+ * broadcasting very large values, since no copy of the object is made.
+ * @param
+ */
+ static class DirectBroadcastHelper extends BroadcastHelper {
+ private Broadcast bcast;
+ private transient T value;
+
+ DirectBroadcastHelper(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public synchronized T getValue() {
+ if (value == null) {
+ value = bcast.getValue();
+ }
+ return value;
+ }
+
+ @Override
+ public void broadcast(JavaSparkContext jsc) {
+ this.bcast = jsc.broadcast(value);
+ }
+ }
+
+ /**
+ * A {@link com.cloudera.dataflow.spark.BroadcastHelper} that uses a
+ * {@link Coder} to encode values as byte arrays
+ * before broadcasting.
+ * @param
+ */
+ static class CodedBroadcastHelper extends BroadcastHelper {
+ private Broadcast bcast;
+ private final Coder coder;
+ private transient T value;
+
+ CodedBroadcastHelper(T value, Coder coder) {
+ this.value = value;
+ this.coder = coder;
+ }
+
+ @Override
+ public synchronized T getValue() {
+ if (value == null) {
+ value = deserialize();
+ }
+ return value;
+ }
+
+ @Override
+ public void broadcast(JavaSparkContext jsc) {
+ this.bcast = jsc.broadcast(CoderHelpers.toByteArray(value, coder));
+ }
+
+ private T deserialize() {
+ T val;
+ try {
+ val = coder.decode(new ByteArrayInputStream(bcast.value()),
+ new Coder.Context(true));
+ } catch (IOException ioe) {
+ // this should not ever happen, log it if it does.
+ LOG.warn(ioe.getMessage());
+ val = null;
+ }
+ return val;
+ }
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
new file mode 100644
index 000000000000..06db57206d43
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/ByteArray.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package com.cloudera.dataflow.spark;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+import com.google.common.primitives.UnsignedBytes;
+
+class ByteArray implements Serializable, Comparable {
+
+ private final byte[] value;
+
+ ByteArray(byte[] value) {
+ this.value = value;
+ }
+
+ public byte[] getValue() {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ByteArray byteArray = (ByteArray) o;
+ return Arrays.equals(value, byteArray.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value != null ? Arrays.hashCode(value) : 0;
+ }
+
+ @Override
+ public int compareTo(ByteArray other) {
+ return UnsignedBytes.lexicographicalComparator().compare(value, other.value);
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
new file mode 100644
index 000000000000..0ae06c1a9e49
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/CoderHelpers.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.common.collect.Iterables;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import scala.Tuple2;
+
+/**
+ * Serialization utility class.
+ */
+public final class CoderHelpers {
+ private CoderHelpers() {
+ }
+
+ /**
+ * Utility method for serializing an object using the specified coder.
+ *
+ * @param value Value to serialize.
+ * @param coder Coder to serialize with.
+ * @param type of value that is serialized
+ * @return Byte array representing serialized object.
+ */
+ static byte[] toByteArray(T value, Coder coder) {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try {
+ coder.encode(value, baos, new Coder.Context(true));
+ } catch (IOException e) {
+ throw new IllegalStateException("Error encoding value: " + value, e);
+ }
+ return baos.toByteArray();
+ }
+
+ /**
+ * Utility method for serializing a Iterable of values using the specified coder.
+ *
+ * @param values Values to serialize.
+ * @param coder Coder to serialize with.
+ * @param type of value that is serialized
+ * @return List of bytes representing serialized objects.
+ */
+ static List toByteArrays(Iterable values, Coder coder) {
+ List res = new LinkedList<>();
+ for (T value : values) {
+ res.add(toByteArray(value, coder));
+ }
+ return res;
+ }
+
+ /**
+ * Utility method for deserializing a byte array using the specified coder.
+ *
+ * @param serialized bytearray to be deserialized.
+ * @param coder Coder to deserialize with.
+ * @param Type of object to be returned.
+ * @return Deserialized object.
+ */
+ static T fromByteArray(byte[] serialized, Coder coder) {
+ ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ try {
+ return coder.decode(bais, new Coder.Context(true));
+ } catch (IOException e) {
+ throw new IllegalStateException("Error decoding bytes for coder: " + coder, e);
+ }
+ }
+
+ /**
+ * A function wrapper for converting an object to a bytearray.
+ *
+ * @param coder Coder to serialize with.
+ * @param The type of the object being serialized.
+ * @return A function that accepts an object and returns its coder-serialized form.
+ */
+ static Function toByteFunction(final Coder coder) {
+ return new Function() {
+ @Override
+ public byte[] call(T t) throws Exception {
+ return toByteArray(t, coder);
+ }
+ };
+ }
+
+ /**
+ * A function wrapper for converting a byte array to an object.
+ *
+ * @param coder Coder to deserialize with.
+ * @param The type of the object being deserialized.
+ * @return A function that accepts a byte array and returns its corresponding object.
+ */
+ static Function fromByteFunction(final Coder coder) {
+ return new Function() {
+ @Override
+ public T call(byte[] bytes) throws Exception {
+ return fromByteArray(bytes, coder);
+ }
+ };
+ }
+
+ /**
+ * A function wrapper for converting a key-value pair to a byte array pair.
+ *
+ * @param keyCoder Coder to serialize keys.
+ * @param valueCoder Coder to serialize values.
+ * @param The type of the key being serialized.
+ * @param The type of the value being serialized.
+ * @return A function that accepts a key-value pair and returns a pair of byte arrays.
+ */
+ static PairFunction, ByteArray, byte[]> toByteFunction(
+ final Coder keyCoder, final Coder valueCoder) {
+ return new PairFunction, ByteArray, byte[]>() {
+ @Override
+ public Tuple2 call(Tuple2 kv) {
+ return new Tuple2<>(new ByteArray(toByteArray(kv._1(), keyCoder)), toByteArray(kv._2(),
+ valueCoder));
+ }
+ };
+ }
+
+ /**
+ * A function wrapper for converting a byte array pair to a key-value pair.
+ *
+ * @param keyCoder Coder to deserialize keys.
+ * @param valueCoder Coder to deserialize values.
+ * @param The type of the key being deserialized.
+ * @param The type of the value being deserialized.
+ * @return A function that accepts a pair of byte arrays and returns a key-value pair.
+ */
+ static PairFunction, K, V> fromByteFunction(
+ final Coder keyCoder, final Coder valueCoder) {
+ return new PairFunction, K, V>() {
+ @Override
+ public Tuple2 call(Tuple2 tuple) {
+ return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
+ fromByteArray(tuple._2(), valueCoder));
+ }
+ };
+ }
+
+ /**
+ * A function wrapper for converting a byte array pair to a key-value pair, where
+ * values are {@link Iterable}.
+ *
+ * @param keyCoder Coder to deserialize keys.
+ * @param valueCoder Coder to deserialize values.
+ * @param The type of the key being deserialized.
+ * @param The type of the value being deserialized.
+ * @return A function that accepts a pair of byte arrays and returns a key-value pair.
+ */
+ static PairFunction>, K, Iterable>
+ fromByteFunctionIterable(final Coder keyCoder, final Coder valueCoder) {
+ return new PairFunction>, K, Iterable>() {
+ @Override
+ public Tuple2> call(Tuple2> tuple) {
+ return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
+ Iterables.transform(tuple._2(), new com.google.common.base.Function() {
+ @Override
+ public V apply(byte[] bytes) {
+ return fromByteArray(bytes, valueCoder);
+ }
+ }));
+ }
+ };
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
new file mode 100644
index 000000000000..2bcfec3dfc43
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/DoFnFunction.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.TupleTag;
+import org.apache.spark.api.java.function.FlatMapFunction;
+
+/**
+ * Dataflow's Do functions correspond to Spark's FlatMap functions.
+ *
+ * @param Input element type.
+ * @param Output element type.
+ */
+public class DoFnFunction implements FlatMapFunction>,
+ WindowedValue> {
+ private final DoFn mFunction;
+ private final SparkRuntimeContext mRuntimeContext;
+ private final Map, BroadcastHelper>> mSideInputs;
+
+ /**
+ * @param fn DoFunction to be wrapped.
+ * @param runtime Runtime to apply function in.
+ * @param sideInputs Side inputs used in DoFunction.
+ */
+ public DoFnFunction(DoFn fn,
+ SparkRuntimeContext runtime,
+ Map, BroadcastHelper>> sideInputs) {
+ this.mFunction = fn;
+ this.mRuntimeContext = runtime;
+ this.mSideInputs = sideInputs;
+ }
+
+ @Override
+ public Iterable> call(Iterator> iter) throws
+ Exception {
+ ProcCtxt ctxt = new ProcCtxt(mFunction, mRuntimeContext, mSideInputs);
+ ctxt.setup();
+ mFunction.startBundle(ctxt);
+ return ctxt.getOutputIterable(iter, mFunction);
+ }
+
+ private class ProcCtxt extends SparkProcessContext> {
+
+ private final List> outputs = new LinkedList<>();
+
+ ProcCtxt(DoFn fn, SparkRuntimeContext runtimeContext, Map,
+ BroadcastHelper>> sideInputs) {
+ super(fn, runtimeContext, sideInputs);
+ }
+
+ @Override
+ public synchronized void output(O o) {
+ outputs.add(windowedValue != null ? windowedValue.withValue(o) :
+ WindowedValue.valueInEmptyWindows(o));
+ }
+
+ @Override
+ public synchronized void output(WindowedValue o) {
+ outputs.add(o);
+ }
+
+ @Override
+ protected void clearOutput() {
+ outputs.clear();
+ }
+
+ @Override
+ protected Iterator> getOutputIterator() {
+ return outputs.iterator();
+ }
+ }
+
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
new file mode 100644
index 000000000000..a6ac6c2f3e86
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationContext.java
@@ -0,0 +1,283 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.runners.AggregatorRetrievalException;
+import com.google.cloud.dataflow.sdk.runners.AggregatorValues;
+import com.google.cloud.dataflow.sdk.transforms.Aggregator;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PInput;
+import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.cloud.dataflow.sdk.values.PValue;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.spark.api.java.JavaRDDLike;
+import org.apache.spark.api.java.JavaSparkContext;
+
+
+/**
+ * Evaluation context allows us to define how pipeline instructions.
+ */
+public class EvaluationContext implements EvaluationResult {
+ private final JavaSparkContext jsc;
+ private final Pipeline pipeline;
+ private final SparkRuntimeContext runtime;
+ private final Map> pcollections = new LinkedHashMap<>();
+ private final Set> leafRdds = new LinkedHashSet<>();
+ private final Set multireads = new LinkedHashSet<>();
+ private final Map pobjects = new LinkedHashMap<>();
+ private final Map>> pview = new LinkedHashMap<>();
+ protected AppliedPTransform, ?, ?> currentTransform;
+
+ public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
+ this.jsc = jsc;
+ this.pipeline = pipeline;
+ this.runtime = new SparkRuntimeContext(jsc, pipeline);
+ }
+
+ /**
+ * Holds an RDD or values for deferred conversion to an RDD if needed. PCollections are
+ * sometimes created from a collection of objects (using RDD parallelize) and then
+ * only used to create View objects; in which case they do not need to be
+ * converted to bytes since they are not transferred across the network until they are
+ * broadcast.
+ */
+ private class RDDHolder {
+
+ private Iterable values;
+ private Coder coder;
+ private JavaRDDLike, ?> rdd;
+
+ RDDHolder(Iterable values, Coder coder) {
+ this.values = values;
+ this.coder = coder;
+ }
+
+ RDDHolder(JavaRDDLike, ?> rdd) {
+ this.rdd = rdd;
+ }
+
+ JavaRDDLike, ?> getRDD() {
+ if (rdd == null) {
+ Iterable> windowedValues = Iterables.transform(values,
+ new Function>() {
+ @Override
+ public WindowedValue apply(T t) {
+ // TODO: this is wrong if T is a TimestampedValue
+ return WindowedValue.valueInEmptyWindows(t);
+ }
+ });
+ WindowedValue.ValueOnlyWindowedValueCoder windowCoder =
+ WindowedValue.getValueOnlyCoder(coder);
+ rdd = jsc.parallelize(CoderHelpers.toByteArrays(windowedValues, windowCoder))
+ .map(CoderHelpers.fromByteFunction(windowCoder));
+ }
+ return rdd;
+ }
+
+ Iterable getValues(PCollection pcollection) {
+ if (values == null) {
+ coder = pcollection.getCoder();
+ JavaRDDLike bytesRDD = rdd.map(WindowingHelpers.unwindowFunction())
+ .map(CoderHelpers.toByteFunction(coder));
+ List clientBytes = bytesRDD.collect();
+ values = Iterables.transform(clientBytes, new Function() {
+ @Override
+ public T apply(byte[] bytes) {
+ return CoderHelpers.fromByteArray(bytes, coder);
+ }
+ });
+ }
+ return values;
+ }
+
+ Iterable> getWindowedValues(PCollection pcollection) {
+ return Iterables.transform(get(pcollection), new Function>() {
+ @Override
+ public WindowedValue apply(T t) {
+ return WindowedValue.valueInEmptyWindows(t); // TODO: not the right place?
+ }
+ });
+ }
+ }
+
+ protected JavaSparkContext getSparkContext() {
+ return jsc;
+ }
+
+ protected Pipeline getPipeline() {
+ return pipeline;
+ }
+
+ protected SparkRuntimeContext getRuntimeContext() {
+ return runtime;
+ }
+
+ protected void setCurrentTransform(AppliedPTransform, ?, ?> transform) {
+ this.currentTransform = transform;
+ }
+
+ protected AppliedPTransform, ?, ?> getCurrentTransform() {
+ return currentTransform;
+ }
+
+ protected I getInput(PTransform transform) {
+ checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
+ "can only be called with current transform");
+ @SuppressWarnings("unchecked")
+ I input = (I) currentTransform.getInput();
+ return input;
+ }
+
+ protected O getOutput(PTransform, O> transform) {
+ checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
+ "can only be called with current transform");
+ @SuppressWarnings("unchecked")
+ O output = (O) currentTransform.getOutput();
+ return output;
+ }
+
+ protected void setOutputRDD(PTransform, ?> transform,
+ JavaRDDLike, ?> rdd) {
+ setRDD((PValue) getOutput(transform), rdd);
+ }
+
+ protected void setOutputRDDFromValues(PTransform, ?> transform, Iterable values,
+ Coder coder) {
+ pcollections.put((PValue) getOutput(transform), new RDDHolder<>(values, coder));
+ }
+
+ void setPView(PValue view, Iterable extends WindowedValue>> value) {
+ pview.put(view, value);
+ }
+
+ protected boolean hasOutputRDD(PTransform extends PInput, ?> transform) {
+ PValue pvalue = (PValue) getOutput(transform);
+ return pcollections.containsKey(pvalue);
+ }
+
+ protected JavaRDDLike, ?> getRDD(PValue pvalue) {
+ RDDHolder> rddHolder = pcollections.get(pvalue);
+ JavaRDDLike, ?> rdd = rddHolder.getRDD();
+ leafRdds.remove(rddHolder);
+ if (multireads.contains(pvalue)) {
+ // Ensure the RDD is marked as cached
+ rdd.rdd().cache();
+ } else {
+ multireads.add(pvalue);
+ }
+ return rdd;
+ }
+
+ protected void setRDD(PValue pvalue, JavaRDDLike, ?> rdd) {
+ try {
+ rdd.rdd().setName(pvalue.getName());
+ } catch (IllegalStateException e) {
+ // name not set, ignore
+ }
+ RDDHolder rddHolder = new RDDHolder<>(rdd);
+ pcollections.put(pvalue, rddHolder);
+ leafRdds.add(rddHolder);
+ }
+
+ JavaRDDLike, ?> getInputRDD(PTransform extends PInput, ?> transform) {
+ return getRDD((PValue) getInput(transform));
+ }
+
+
+ Iterable extends WindowedValue>> getPCollectionView(PCollectionView view) {
+ return pview.get(view);
+ }
+
+ /**
+ * Computes the outputs for all RDDs that are leaves in the DAG and do not have any
+ * actions (like saving to a file) registered on them (i.e. they are performed for side
+ * effects).
+ */
+ protected void computeOutputs() {
+ for (RDDHolder> rddHolder : leafRdds) {
+ JavaRDDLike, ?> rdd = rddHolder.getRDD();
+ rdd.rdd().cache(); // cache so that any subsequent get() is cheap
+ rdd.count(); // force the RDD to be computed
+ }
+ }
+
+ @Override
+ public T get(PValue value) {
+ if (pobjects.containsKey(value)) {
+ @SuppressWarnings("unchecked")
+ T result = (T) pobjects.get(value);
+ return result;
+ }
+ if (pcollections.containsKey(value)) {
+ JavaRDDLike, ?> rdd = pcollections.get(value).getRDD();
+ @SuppressWarnings("unchecked")
+ T res = (T) Iterables.getOnlyElement(rdd.collect());
+ pobjects.put(value, res);
+ return res;
+ }
+ throw new IllegalStateException("Cannot resolve un-known PObject: " + value);
+ }
+
+ @Override
+ public T getAggregatorValue(String named, Class resultType) {
+ return runtime.getAggregatorValue(named, resultType);
+ }
+
+ @Override
+ public AggregatorValues getAggregatorValues(Aggregator, T> aggregator)
+ throws AggregatorRetrievalException {
+ return runtime.getAggregatorValues(aggregator);
+ }
+
+ @Override
+ public Iterable get(PCollection pcollection) {
+ @SuppressWarnings("unchecked")
+ RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection);
+ return rddHolder.getValues(pcollection);
+ }
+
+ Iterable> getWindowedValues(PCollection pcollection) {
+ @SuppressWarnings("unchecked")
+ RDDHolder rddHolder = (RDDHolder) pcollections.get(pcollection);
+ return rddHolder.getWindowedValues(pcollection);
+ }
+
+ @Override
+ public void close() {
+ SparkContextFactory.stopSparkContext(jsc);
+ }
+
+ /** The runner is blocking. */
+ @Override
+ public State getState() {
+ return State.DONE;
+ }
+}
diff --git a/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
new file mode 100644
index 000000000000..aad029aec17d
--- /dev/null
+++ b/runners/spark/src/main/java/com/cloudera/dataflow/spark/EvaluationResult.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+
+package com.cloudera.dataflow.spark;
+
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PValue;
+
+/**
+ * Interface for retrieving the result(s) of running a pipeline. Allows us to translate between
+ * {@code PObject}s or {@code PCollection}s and Ts or collections of Ts.
+ */
+public interface EvaluationResult extends PipelineResult {
+ /**
+ * Retrieves an iterable of results associated with the PCollection passed in.
+ *
+ * @param pcollection Collection we wish to translate.
+ * @param Type of elements contained in collection.
+ * @return Natively types result associated with collection.
+ */
+ Iterable get(PCollection pcollection);
+
+ /**
+ * Retrieve an object of Type T associated with the PValue passed in.
+ *
+ * @param pval PValue to retrieve associated data for.
+ * @param Type of object to return.
+ * @return Native object.
+ */
+