Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
6c7ed12
Remove StateSampler from the SDK
swegner Mar 4, 2016
237da8c
Rename registrar files from dataflow to beam
Apr 21, 2016
03a1b77
Update README according to dataflow->beam package rename
Apr 21, 2016
c3567f1
Closes #224
dhalperi Apr 21, 2016
a8ebb51
Wrap Exceptions thrown in StartBundle in the InProcessPipelineRunner
tgroh Apr 21, 2016
6dd5d70
This closes #227
Apr 21, 2016
53b7652
Convert BaseExecutionContext.getOrCreateStepContext to use specific i…
swegner Apr 21, 2016
0a15bf5
Expose IsmFormat.Footer constants as public
swegner Apr 21, 2016
4c01c7d
This closes #228
lukecwik Apr 22, 2016
7b175df
KafkaIO: unbounded source for reading from Apache Kafka
Feb 16, 2016
9210660
Kafka: various fixes
dhalperi Apr 7, 2016
640e258
Closes #142
dhalperi Apr 22, 2016
61988f3
Implement Create as An OffsetBasedSource
tgroh Apr 14, 2016
59fd4b3
Factor common setup in EncodabilityEnforcementFactoryTest
tgroh Apr 21, 2016
a5548f9
Closes #183
dhalperi Apr 22, 2016
8f92b98
[BEAM-50] Fix BigQuery.Write tempFilePrefix concatenation
peihe Apr 20, 2016
10e6284
Closes #192
dhalperi Apr 22, 2016
f55bb1d
Fix how this SDK identifies itself to the Cloud Dataflow service
davorbonaci Apr 20, 2016
390c5a8
Rename DataflowReleaseInfo to ReleaseInfo
davorbonaci Apr 21, 2016
7465edb
This closes #222
davorbonaci Apr 22, 2016
d7239fb
Rename registrar files from dataflow to beam
Apr 21, 2016
cc32d00
Update README according to dataflow->beam package rename
Apr 21, 2016
aa1cb76
Remove specific registrar classes and service files
Apr 22, 2016
fd098fa
Add dependency on AutoService
Apr 22, 2016
c642c3f
Add SparkRunnerRegistrar as a runner specific registrar that uses Aut…
Apr 22, 2016
a465c44
Add a unit test for the SparkRunnerRegistrar
Apr 22, 2016
fd1aaac
Merge branch 'BEAM-213' of https://github.com/amitsela/incubator-beam…
Apr 22, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.DataflowReleaseInfo;
import org.apache.beam.sdk.util.DataflowTransport;
import org.apache.beam.sdk.util.IOChannelUtils;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.sdk.util.MonitoringUtil;
import org.apache.beam.sdk.util.PCollectionViews;
import org.apache.beam.sdk.util.PathValidator;
import org.apache.beam.sdk.util.PropertyNames;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.Reshuffle;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.ValueWithRecordId;
Expand Down Expand Up @@ -507,10 +507,10 @@ public DataflowPipelineJob run(Pipeline pipeline) {
Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);

String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
String version = ReleaseInfo.getReleaseInfo().getVersion();
System.out.println("Dataflow SDK version: " + version);

newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
newJob.getEnvironment().setUserAgent(ReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.beam.sdk.runners;

import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray;
import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString;
import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray;
Expand All @@ -34,7 +33,6 @@
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.io.BigQueryIO;
import org.apache.beam.sdk.io.PubsubIO;
Expand All @@ -47,7 +45,6 @@
import org.apache.beam.sdk.runners.dataflow.ReadTranslator;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand Down Expand Up @@ -844,45 +841,6 @@ private <K, InputT, OutputT> void translateHelper(
}
});

registerTransformTranslator(
Create.Values.class,
new TransformTranslator<Create.Values>() {
@Override
public void translate(
Create.Values transform,
TranslationContext context) {
createHelper(transform, context);
}

private <T> void createHelper(
Create.Values<T> transform,
TranslationContext context) {
context.addStep(transform, "CreateCollection");

Coder<T> coder = context.getOutput(transform).getCoder();
List<CloudObject> elements = new LinkedList<>();
for (T elem : transform.getElements()) {
byte[] encodedBytes;
try {
encodedBytes = encodeToByteArray(coder, elem);
} catch (CoderException exn) {
// TODO: Put in better element printing:
// truncate if too long.
throw new IllegalArgumentException(
"Unable to encode element '" + elem + "' of transform '" + transform
+ "' using coder '" + coder + "'.",
exn);
}
String encodedJson = byteArrayToJsonString(encodedBytes);
assert Arrays.equals(encodedBytes,
jsonStringToByteArray(encodedJson));
elements.add(CloudObject.forString(encodedJson));
}
context.addInput(PropertyNames.ELEMENT, elements);
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});

registerTransformTranslator(
Flatten.FlattenPCollectionList.class,
new TransformTranslator<Flatten.FlattenPCollectionList>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.startsWith;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
Expand Down Expand Up @@ -68,9 +69,9 @@
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.DataflowReleaseInfo;
import org.apache.beam.sdk.util.GcsUtil;
import org.apache.beam.sdk.util.NoopPathValidator;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.util.TestCredential;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -375,10 +376,10 @@ public void testRunWithFiles() throws IOException {
cloudDataflowDataset,
workflowJob.getEnvironment().getDataset());
assertEquals(
DataflowReleaseInfo.getReleaseInfo().getName(),
ReleaseInfo.getReleaseInfo().getName(),
workflowJob.getEnvironment().getUserAgent().get("name"));
assertEquals(
DataflowReleaseInfo.getReleaseInfo().getVersion(),
ReleaseInfo.getReleaseInfo().getVersion(),
workflowJob.getEnvironment().getUserAgent().get("version"));
}

Expand Down Expand Up @@ -840,9 +841,16 @@ public void testApplyIsScopedToExactClass() throws IOException {
CompositeTransformRecorder recorder = new CompositeTransformRecorder();
p.traverseTopologically(recorder);

assertThat("Expected to have seen CreateTimestamped composite transform.",
// The recorder will also have seen a Create.Values composite as well, but we can't obtain that
// transform.
assertThat(
"Expected to have seen CreateTimestamped composite transform.",
recorder.getCompositeTransforms(),
Matchers.<PTransform<?, ?>>contains(transform));
hasItem(transform));
assertThat(
"Expected to have two composites, CreateTimestamped and Create.Values",
recorder.getCompositeTransforms(),
hasItem(Matchers.<PTransform<?, ?>>isA((Class) Create.Values.class)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ public void testToSingletonTranslation() throws Exception {
assertEquals(2, steps.size());

Step createStep = steps.get(0);
assertEquals("CreateCollection", createStep.getKind());
assertEquals("ParallelRead", createStep.getKind());

Step collectionToSingletonStep = steps.get(1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
Expand Down Expand Up @@ -783,7 +783,7 @@ public void testToIterableTranslation() throws Exception {
assertEquals(2, steps.size());

Step createStep = steps.get(0);
assertEquals("CreateCollection", createStep.getKind());
assertEquals("ParallelRead", createStep.getKind());

Step collectionToSingletonStep = steps.get(1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
Expand Down
6 changes: 3 additions & 3 deletions runners/spark/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ Switch to the Spark runner directory:
Then run the [word count example][wc] from the SDK using a single threaded Spark instance
in local mode:

mvn exec:exec -DmainClass=com.google.cloud.dataflow.examples.WordCount \
mvn exec:exec -DmainClass=org.apache.beam.examples.WordCount \
-Dinput=/tmp/kinglear.txt -Doutput=/tmp/out -Drunner=SparkPipelineRunner \
-DsparkMaster=local

Expand All @@ -104,7 +104,7 @@ Check the output by running:
__Note: running examples using `mvn exec:exec` only works for Spark local mode at the
moment. See the next section for how to run on a cluster.__

[wc]: https://github.com/apache/incubator-beam/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/WordCount.java
[wc]: https://github.com/apache/incubator-beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
## Running on a Cluster

Spark Beam pipelines can be run on a cluster using the `spark-submit` command.
Expand All @@ -117,7 +117,7 @@ Then run the word count example using Spark submit with the `yarn-client` master
(`yarn-cluster` works just as well):

spark-submit \
--class com.google.cloud.dataflow.examples.WordCount \
--class org.apache.beam.examples.WordCount \
--master yarn-client \
target/spark-runner-*-spark-app.jar \
--inputFile=kinglear.txt --output=out --runner=SparkPipelineRunner --sparkMaster=yarn-client
Expand Down
6 changes: 6 additions & 0 deletions runners/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0-rc2</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>java-sdk-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.runners.spark;

import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;

/**
* Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
* {@link SparkPipelineRunner}.
*
* {@link AutoService} will register Spark's implementations of the {@link PipelineRunner}
* and {@link PipelineOptions} as available pipeline runner services.
*/
public final class SparkRunnerRegistrar {
private SparkRunnerRegistrar() {}

/**
* Registers the {@link SparkPipelineRunner}.
*/
@AutoService(PipelineRunnerRegistrar.class)
public static class Runner implements PipelineRunnerRegistrar {
@Override
public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
return ImmutableList.<Class<? extends PipelineRunner<?>>>of(SparkPipelineRunner.class);
}
}

/**
* Registers the {@link SparkPipelineOptions} and {@link SparkStreamingPipelineOptions}.
*/
@AutoService(PipelineOptionsRegistrar.class)
public static class Options implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>of(
SparkPipelineOptions.class,
SparkStreamingPipelineOptions.class);
}
}
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading